1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import static java.util.concurrent.TimeUnit.NANOSECONDS; 39 40 import java.util.AbstractQueue; 41 import java.util.Collection; 42 import java.util.Iterator; 43 import java.util.NoSuchElementException; 44 import java.util.PriorityQueue; 45 import java.util.concurrent.locks.Condition; 46 import java.util.concurrent.locks.ReentrantLock; 47 48 // BEGIN android-note 49 // removed link to collections framework docs 50 // END android-note 51 52 /** 53 * An unbounded {@linkplain BlockingQueue blocking queue} of 54 * {@code Delayed} elements, in which an element can only be taken 55 * when its delay has expired. The <em>head</em> of the queue is that 56 * {@code Delayed} element whose delay expired furthest in the 57 * past. If no delay has expired there is no head and {@code poll} 58 * will return {@code null}. Expiration occurs when an element's 59 * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less 60 * than or equal to zero. Even though unexpired elements cannot be 61 * removed using {@code take} or {@code poll}, they are otherwise 62 * treated as normal elements. For example, the {@code size} method 63 * returns the count of both expired and unexpired elements. 64 * This queue does not permit null elements. 65 * 66 * <p>This class and its iterator implement all of the 67 * <em>optional</em> methods of the {@link Collection} and {@link 68 * Iterator} interfaces. The Iterator provided in method {@link 69 * #iterator()} is <em>not</em> guaranteed to traverse the elements of 70 * the DelayQueue in any particular order. 71 * 72 * @since 1.5 73 * @author Doug Lea 74 * @param <E> the type of elements held in this queue 75 */ 76 public class DelayQueue<E extends Delayed> extends AbstractQueue<E> 77 implements BlockingQueue<E> { 78 79 private final transient ReentrantLock lock = new ReentrantLock(); 80 private final PriorityQueue<E> q = new PriorityQueue<E>(); 81 82 /** 83 * Thread designated to wait for the element at the head of 84 * the queue. This variant of the Leader-Follower pattern 85 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to 86 * minimize unnecessary timed waiting. When a thread becomes 87 * the leader, it waits only for the next delay to elapse, but 88 * other threads await indefinitely. The leader thread must 89 * signal some other thread before returning from take() or 90 * poll(...), unless some other thread becomes leader in the 91 * interim. Whenever the head of the queue is replaced with 92 * an element with an earlier expiration time, the leader 93 * field is invalidated by being reset to null, and some 94 * waiting thread, but not necessarily the current leader, is 95 * signalled. So waiting threads must be prepared to acquire 96 * and lose leadership while waiting. 97 */ 98 private Thread leader; 99 100 /** 101 * Condition signalled when a newer element becomes available 102 * at the head of the queue or a new thread may need to 103 * become leader. 104 */ 105 private final Condition available = lock.newCondition(); 106 107 /** 108 * Creates a new {@code DelayQueue} that is initially empty. 109 */ DelayQueue()110 public DelayQueue() {} 111 112 /** 113 * Creates a {@code DelayQueue} initially containing the elements of the 114 * given collection of {@link Delayed} instances. 115 * 116 * @param c the collection of elements to initially contain 117 * @throws NullPointerException if the specified collection or any 118 * of its elements are null 119 */ DelayQueue(Collection<? extends E> c)120 public DelayQueue(Collection<? extends E> c) { 121 this.addAll(c); 122 } 123 124 /** 125 * Inserts the specified element into this delay queue. 126 * 127 * @param e the element to add 128 * @return {@code true} (as specified by {@link Collection#add}) 129 * @throws NullPointerException if the specified element is null 130 */ add(E e)131 public boolean add(E e) { 132 return offer(e); 133 } 134 135 /** 136 * Inserts the specified element into this delay queue. 137 * 138 * @param e the element to add 139 * @return {@code true} 140 * @throws NullPointerException if the specified element is null 141 */ offer(E e)142 public boolean offer(E e) { 143 final ReentrantLock lock = this.lock; 144 lock.lock(); 145 try { 146 q.offer(e); 147 if (q.peek() == e) { 148 leader = null; 149 available.signal(); 150 } 151 return true; 152 } finally { 153 lock.unlock(); 154 } 155 } 156 157 /** 158 * Inserts the specified element into this delay queue. As the queue is 159 * unbounded this method will never block. 160 * 161 * @param e the element to add 162 * @throws NullPointerException {@inheritDoc} 163 */ put(E e)164 public void put(E e) { 165 offer(e); 166 } 167 168 /** 169 * Inserts the specified element into this delay queue. As the queue is 170 * unbounded this method will never block. 171 * 172 * @param e the element to add 173 * @param timeout This parameter is ignored as the method never blocks 174 * @param unit This parameter is ignored as the method never blocks 175 * @return {@code true} 176 * @throws NullPointerException {@inheritDoc} 177 */ offer(E e, long timeout, TimeUnit unit)178 public boolean offer(E e, long timeout, TimeUnit unit) { 179 return offer(e); 180 } 181 182 /** 183 * Retrieves and removes the head of this queue, or returns {@code null} 184 * if this queue has no elements with an expired delay. 185 * 186 * @return the head of this queue, or {@code null} if this 187 * queue has no elements with an expired delay 188 */ poll()189 public E poll() { 190 final ReentrantLock lock = this.lock; 191 lock.lock(); 192 try { 193 E first = q.peek(); 194 return (first == null || first.getDelay(NANOSECONDS) > 0) 195 ? null 196 : q.poll(); 197 } finally { 198 lock.unlock(); 199 } 200 } 201 202 /** 203 * Retrieves and removes the head of this queue, waiting if necessary 204 * until an element with an expired delay is available on this queue. 205 * 206 * @return the head of this queue 207 * @throws InterruptedException {@inheritDoc} 208 */ take()209 public E take() throws InterruptedException { 210 final ReentrantLock lock = this.lock; 211 lock.lockInterruptibly(); 212 try { 213 for (;;) { 214 E first = q.peek(); 215 if (first == null) 216 available.await(); 217 else { 218 long delay = first.getDelay(NANOSECONDS); 219 if (delay <= 0L) 220 return q.poll(); 221 first = null; // don't retain ref while waiting 222 if (leader != null) 223 available.await(); 224 else { 225 Thread thisThread = Thread.currentThread(); 226 leader = thisThread; 227 try { 228 available.awaitNanos(delay); 229 } finally { 230 if (leader == thisThread) 231 leader = null; 232 } 233 } 234 } 235 } 236 } finally { 237 if (leader == null && q.peek() != null) 238 available.signal(); 239 lock.unlock(); 240 } 241 } 242 243 /** 244 * Retrieves and removes the head of this queue, waiting if necessary 245 * until an element with an expired delay is available on this queue, 246 * or the specified wait time expires. 247 * 248 * @return the head of this queue, or {@code null} if the 249 * specified waiting time elapses before an element with 250 * an expired delay becomes available 251 * @throws InterruptedException {@inheritDoc} 252 */ poll(long timeout, TimeUnit unit)253 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 254 long nanos = unit.toNanos(timeout); 255 final ReentrantLock lock = this.lock; 256 lock.lockInterruptibly(); 257 try { 258 for (;;) { 259 E first = q.peek(); 260 if (first == null) { 261 if (nanos <= 0L) 262 return null; 263 else 264 nanos = available.awaitNanos(nanos); 265 } else { 266 long delay = first.getDelay(NANOSECONDS); 267 if (delay <= 0L) 268 return q.poll(); 269 if (nanos <= 0L) 270 return null; 271 first = null; // don't retain ref while waiting 272 if (nanos < delay || leader != null) 273 nanos = available.awaitNanos(nanos); 274 else { 275 Thread thisThread = Thread.currentThread(); 276 leader = thisThread; 277 try { 278 long timeLeft = available.awaitNanos(delay); 279 nanos -= delay - timeLeft; 280 } finally { 281 if (leader == thisThread) 282 leader = null; 283 } 284 } 285 } 286 } 287 } finally { 288 if (leader == null && q.peek() != null) 289 available.signal(); 290 lock.unlock(); 291 } 292 } 293 294 /** 295 * Retrieves, but does not remove, the head of this queue, or 296 * returns {@code null} if this queue is empty. Unlike 297 * {@code poll}, if no expired elements are available in the queue, 298 * this method returns the element that will expire next, 299 * if one exists. 300 * 301 * @return the head of this queue, or {@code null} if this 302 * queue is empty 303 */ peek()304 public E peek() { 305 final ReentrantLock lock = this.lock; 306 lock.lock(); 307 try { 308 return q.peek(); 309 } finally { 310 lock.unlock(); 311 } 312 } 313 size()314 public int size() { 315 final ReentrantLock lock = this.lock; 316 lock.lock(); 317 try { 318 return q.size(); 319 } finally { 320 lock.unlock(); 321 } 322 } 323 324 /** 325 * Returns first element only if it is expired. 326 * Used only by drainTo. Call only when holding lock. 327 */ peekExpired()328 private E peekExpired() { 329 // assert lock.isHeldByCurrentThread(); 330 E first = q.peek(); 331 return (first == null || first.getDelay(NANOSECONDS) > 0) ? 332 null : first; 333 } 334 335 /** 336 * @throws UnsupportedOperationException {@inheritDoc} 337 * @throws ClassCastException {@inheritDoc} 338 * @throws NullPointerException {@inheritDoc} 339 * @throws IllegalArgumentException {@inheritDoc} 340 */ drainTo(Collection<? super E> c)341 public int drainTo(Collection<? super E> c) { 342 if (c == null) 343 throw new NullPointerException(); 344 if (c == this) 345 throw new IllegalArgumentException(); 346 final ReentrantLock lock = this.lock; 347 lock.lock(); 348 try { 349 int n = 0; 350 for (E e; (e = peekExpired()) != null;) { 351 c.add(e); // In this order, in case add() throws. 352 q.poll(); 353 ++n; 354 } 355 return n; 356 } finally { 357 lock.unlock(); 358 } 359 } 360 361 /** 362 * @throws UnsupportedOperationException {@inheritDoc} 363 * @throws ClassCastException {@inheritDoc} 364 * @throws NullPointerException {@inheritDoc} 365 * @throws IllegalArgumentException {@inheritDoc} 366 */ drainTo(Collection<? super E> c, int maxElements)367 public int drainTo(Collection<? super E> c, int maxElements) { 368 if (c == null) 369 throw new NullPointerException(); 370 if (c == this) 371 throw new IllegalArgumentException(); 372 if (maxElements <= 0) 373 return 0; 374 final ReentrantLock lock = this.lock; 375 lock.lock(); 376 try { 377 int n = 0; 378 for (E e; n < maxElements && (e = peekExpired()) != null;) { 379 c.add(e); // In this order, in case add() throws. 380 q.poll(); 381 ++n; 382 } 383 return n; 384 } finally { 385 lock.unlock(); 386 } 387 } 388 389 /** 390 * Atomically removes all of the elements from this delay queue. 391 * The queue will be empty after this call returns. 392 * Elements with an unexpired delay are not waited for; they are 393 * simply discarded from the queue. 394 */ clear()395 public void clear() { 396 final ReentrantLock lock = this.lock; 397 lock.lock(); 398 try { 399 q.clear(); 400 } finally { 401 lock.unlock(); 402 } 403 } 404 405 /** 406 * Always returns {@code Integer.MAX_VALUE} because 407 * a {@code DelayQueue} is not capacity constrained. 408 * 409 * @return {@code Integer.MAX_VALUE} 410 */ remainingCapacity()411 public int remainingCapacity() { 412 return Integer.MAX_VALUE; 413 } 414 415 /** 416 * Returns an array containing all of the elements in this queue. 417 * The returned array elements are in no particular order. 418 * 419 * <p>The returned array will be "safe" in that no references to it are 420 * maintained by this queue. (In other words, this method must allocate 421 * a new array). The caller is thus free to modify the returned array. 422 * 423 * <p>This method acts as bridge between array-based and collection-based 424 * APIs. 425 * 426 * @return an array containing all of the elements in this queue 427 */ toArray()428 public Object[] toArray() { 429 final ReentrantLock lock = this.lock; 430 lock.lock(); 431 try { 432 return q.toArray(); 433 } finally { 434 lock.unlock(); 435 } 436 } 437 438 /** 439 * Returns an array containing all of the elements in this queue; the 440 * runtime type of the returned array is that of the specified array. 441 * The returned array elements are in no particular order. 442 * If the queue fits in the specified array, it is returned therein. 443 * Otherwise, a new array is allocated with the runtime type of the 444 * specified array and the size of this queue. 445 * 446 * <p>If this queue fits in the specified array with room to spare 447 * (i.e., the array has more elements than this queue), the element in 448 * the array immediately following the end of the queue is set to 449 * {@code null}. 450 * 451 * <p>Like the {@link #toArray()} method, this method acts as bridge between 452 * array-based and collection-based APIs. Further, this method allows 453 * precise control over the runtime type of the output array, and may, 454 * under certain circumstances, be used to save allocation costs. 455 * 456 * <p>The following code can be used to dump a delay queue into a newly 457 * allocated array of {@code Delayed}: 458 * 459 * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre> 460 * 461 * Note that {@code toArray(new Object[0])} is identical in function to 462 * {@code toArray()}. 463 * 464 * @param a the array into which the elements of the queue are to 465 * be stored, if it is big enough; otherwise, a new array of the 466 * same runtime type is allocated for this purpose 467 * @return an array containing all of the elements in this queue 468 * @throws ArrayStoreException if the runtime type of the specified array 469 * is not a supertype of the runtime type of every element in 470 * this queue 471 * @throws NullPointerException if the specified array is null 472 */ toArray(T[] a)473 public <T> T[] toArray(T[] a) { 474 final ReentrantLock lock = this.lock; 475 lock.lock(); 476 try { 477 return q.toArray(a); 478 } finally { 479 lock.unlock(); 480 } 481 } 482 483 /** 484 * Removes a single instance of the specified element from this 485 * queue, if it is present, whether or not it has expired. 486 */ remove(Object o)487 public boolean remove(Object o) { 488 final ReentrantLock lock = this.lock; 489 lock.lock(); 490 try { 491 return q.remove(o); 492 } finally { 493 lock.unlock(); 494 } 495 } 496 497 /** 498 * Identity-based version for use in Itr.remove. 499 */ removeEQ(Object o)500 void removeEQ(Object o) { 501 final ReentrantLock lock = this.lock; 502 lock.lock(); 503 try { 504 for (Iterator<E> it = q.iterator(); it.hasNext(); ) { 505 if (o == it.next()) { 506 it.remove(); 507 break; 508 } 509 } 510 } finally { 511 lock.unlock(); 512 } 513 } 514 515 /** 516 * Returns an iterator over all the elements (both expired and 517 * unexpired) in this queue. The iterator does not return the 518 * elements in any particular order. 519 * 520 * <p>The returned iterator is 521 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 522 * 523 * @return an iterator over the elements in this queue 524 */ iterator()525 public Iterator<E> iterator() { 526 return new Itr(toArray()); 527 } 528 529 /** 530 * Snapshot iterator that works off copy of underlying q array. 531 */ 532 private class Itr implements Iterator<E> { 533 final Object[] array; // Array of all elements 534 int cursor; // index of next element to return 535 int lastRet; // index of last element, or -1 if no such 536 Itr(Object[] array)537 Itr(Object[] array) { 538 lastRet = -1; 539 this.array = array; 540 } 541 hasNext()542 public boolean hasNext() { 543 return cursor < array.length; 544 } 545 546 @SuppressWarnings("unchecked") next()547 public E next() { 548 if (cursor >= array.length) 549 throw new NoSuchElementException(); 550 lastRet = cursor; 551 return (E)array[cursor++]; 552 } 553 remove()554 public void remove() { 555 if (lastRet < 0) 556 throw new IllegalStateException(); 557 removeEQ(array[lastRet]); 558 lastRet = -1; 559 } 560 } 561 562 } 563