1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 9 import java.util.AbstractQueue; 10 import java.util.Arrays; 11 import java.util.Collection; 12 import java.util.Iterator; 13 import java.util.NoSuchElementException; 14 import java.util.Queue; 15 import java.util.Spliterator; 16 import java.util.Spliterators; 17 import java.util.concurrent.locks.LockSupport; 18 import java.util.function.Consumer; 19 20 // BEGIN android-note 21 // removed link to collections framework docs 22 // END android-note 23 24 /** 25 * An unbounded {@link TransferQueue} based on linked nodes. 26 * This queue orders elements FIFO (first-in-first-out) with respect 27 * to any given producer. The <em>head</em> of the queue is that 28 * element that has been on the queue the longest time for some 29 * producer. The <em>tail</em> of the queue is that element that has 30 * been on the queue the shortest time for some producer. 31 * 32 * <p>Beware that, unlike in most collections, the {@code size} method 33 * is <em>NOT</em> a constant-time operation. Because of the 34 * asynchronous nature of these queues, determining the current number 35 * of elements requires a traversal of the elements, and so may report 36 * inaccurate results if this collection is modified during traversal. 37 * Additionally, the bulk operations {@code addAll}, 38 * {@code removeAll}, {@code retainAll}, {@code containsAll}, 39 * {@code equals}, and {@code toArray} are <em>not</em> guaranteed 40 * to be performed atomically. For example, an iterator operating 41 * concurrently with an {@code addAll} operation might view only some 42 * of the added elements. 43 * 44 * <p>This class and its iterator implement all of the 45 * <em>optional</em> methods of the {@link Collection} and {@link 46 * Iterator} interfaces. 47 * 48 * <p>Memory consistency effects: As with other concurrent 49 * collections, actions in a thread prior to placing an object into a 50 * {@code LinkedTransferQueue} 51 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 52 * actions subsequent to the access or removal of that element from 53 * the {@code LinkedTransferQueue} in another thread. 54 * 55 * @since 1.7 56 * @author Doug Lea 57 * @param <E> the type of elements held in this queue 58 */ 59 public class LinkedTransferQueue<E> extends AbstractQueue<E> 60 implements TransferQueue<E>, java.io.Serializable { 61 private static final long serialVersionUID = -3223113410248163686L; 62 63 /* 64 * *** Overview of Dual Queues with Slack *** 65 * 66 * Dual Queues, introduced by Scherer and Scott 67 * (http://www.cs.rice.edu/~wns1/papers/2004-DISC-DDS.pdf) are 68 * (linked) queues in which nodes may represent either data or 69 * requests. When a thread tries to enqueue a data node, but 70 * encounters a request node, it instead "matches" and removes it; 71 * and vice versa for enqueuing requests. Blocking Dual Queues 72 * arrange that threads enqueuing unmatched requests block until 73 * other threads provide the match. Dual Synchronous Queues (see 74 * Scherer, Lea, & Scott 75 * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf) 76 * additionally arrange that threads enqueuing unmatched data also 77 * block. Dual Transfer Queues support all of these modes, as 78 * dictated by callers. 79 * 80 * A FIFO dual queue may be implemented using a variation of the 81 * Michael & Scott (M&S) lock-free queue algorithm 82 * (http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf). 83 * It maintains two pointer fields, "head", pointing to a 84 * (matched) node that in turn points to the first actual 85 * (unmatched) queue node (or null if empty); and "tail" that 86 * points to the last node on the queue (or again null if 87 * empty). For example, here is a possible queue with four data 88 * elements: 89 * 90 * head tail 91 * | | 92 * v v 93 * M -> U -> U -> U -> U 94 * 95 * The M&S queue algorithm is known to be prone to scalability and 96 * overhead limitations when maintaining (via CAS) these head and 97 * tail pointers. This has led to the development of 98 * contention-reducing variants such as elimination arrays (see 99 * Moir et al http://portal.acm.org/citation.cfm?id=1074013) and 100 * optimistic back pointers (see Ladan-Mozes & Shavit 101 * http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf). 102 * However, the nature of dual queues enables a simpler tactic for 103 * improving M&S-style implementations when dual-ness is needed. 104 * 105 * In a dual queue, each node must atomically maintain its match 106 * status. While there are other possible variants, we implement 107 * this here as: for a data-mode node, matching entails CASing an 108 * "item" field from a non-null data value to null upon match, and 109 * vice-versa for request nodes, CASing from null to a data 110 * value. (Note that the linearization properties of this style of 111 * queue are easy to verify -- elements are made available by 112 * linking, and unavailable by matching.) Compared to plain M&S 113 * queues, this property of dual queues requires one additional 114 * successful atomic operation per enq/deq pair. But it also 115 * enables lower cost variants of queue maintenance mechanics. (A 116 * variation of this idea applies even for non-dual queues that 117 * support deletion of interior elements, such as 118 * j.u.c.ConcurrentLinkedQueue.) 119 * 120 * Once a node is matched, its match status can never again 121 * change. We may thus arrange that the linked list of them 122 * contain a prefix of zero or more matched nodes, followed by a 123 * suffix of zero or more unmatched nodes. (Note that we allow 124 * both the prefix and suffix to be zero length, which in turn 125 * means that we do not use a dummy header.) If we were not 126 * concerned with either time or space efficiency, we could 127 * correctly perform enqueue and dequeue operations by traversing 128 * from a pointer to the initial node; CASing the item of the 129 * first unmatched node on match and CASing the next field of the 130 * trailing node on appends. (Plus some special-casing when 131 * initially empty). While this would be a terrible idea in 132 * itself, it does have the benefit of not requiring ANY atomic 133 * updates on head/tail fields. 134 * 135 * We introduce here an approach that lies between the extremes of 136 * never versus always updating queue (head and tail) pointers. 137 * This offers a tradeoff between sometimes requiring extra 138 * traversal steps to locate the first and/or last unmatched 139 * nodes, versus the reduced overhead and contention of fewer 140 * updates to queue pointers. For example, a possible snapshot of 141 * a queue is: 142 * 143 * head tail 144 * | | 145 * v v 146 * M -> M -> U -> U -> U -> U 147 * 148 * The best value for this "slack" (the targeted maximum distance 149 * between the value of "head" and the first unmatched node, and 150 * similarly for "tail") is an empirical matter. We have found 151 * that using very small constants in the range of 1-3 work best 152 * over a range of platforms. Larger values introduce increasing 153 * costs of cache misses and risks of long traversal chains, while 154 * smaller values increase CAS contention and overhead. 155 * 156 * Dual queues with slack differ from plain M&S dual queues by 157 * virtue of only sometimes updating head or tail pointers when 158 * matching, appending, or even traversing nodes; in order to 159 * maintain a targeted slack. The idea of "sometimes" may be 160 * operationalized in several ways. The simplest is to use a 161 * per-operation counter incremented on each traversal step, and 162 * to try (via CAS) to update the associated queue pointer 163 * whenever the count exceeds a threshold. Another, that requires 164 * more overhead, is to use random number generators to update 165 * with a given probability per traversal step. 166 * 167 * In any strategy along these lines, because CASes updating 168 * fields may fail, the actual slack may exceed targeted 169 * slack. However, they may be retried at any time to maintain 170 * targets. Even when using very small slack values, this 171 * approach works well for dual queues because it allows all 172 * operations up to the point of matching or appending an item 173 * (hence potentially allowing progress by another thread) to be 174 * read-only, thus not introducing any further contention. As 175 * described below, we implement this by performing slack 176 * maintenance retries only after these points. 177 * 178 * As an accompaniment to such techniques, traversal overhead can 179 * be further reduced without increasing contention of head 180 * pointer updates: Threads may sometimes shortcut the "next" link 181 * path from the current "head" node to be closer to the currently 182 * known first unmatched node, and similarly for tail. Again, this 183 * may be triggered with using thresholds or randomization. 184 * 185 * These ideas must be further extended to avoid unbounded amounts 186 * of costly-to-reclaim garbage caused by the sequential "next" 187 * links of nodes starting at old forgotten head nodes: As first 188 * described in detail by Boehm 189 * (http://portal.acm.org/citation.cfm?doid=503272.503282), if a GC 190 * delays noticing that any arbitrarily old node has become 191 * garbage, all newer dead nodes will also be unreclaimed. 192 * (Similar issues arise in non-GC environments.) To cope with 193 * this in our implementation, upon CASing to advance the head 194 * pointer, we set the "next" link of the previous head to point 195 * only to itself; thus limiting the length of connected dead lists. 196 * (We also take similar care to wipe out possibly garbage 197 * retaining values held in other Node fields.) However, doing so 198 * adds some further complexity to traversal: If any "next" 199 * pointer links to itself, it indicates that the current thread 200 * has lagged behind a head-update, and so the traversal must 201 * continue from the "head". Traversals trying to find the 202 * current tail starting from "tail" may also encounter 203 * self-links, in which case they also continue at "head". 204 * 205 * It is tempting in slack-based scheme to not even use CAS for 206 * updates (similarly to Ladan-Mozes & Shavit). However, this 207 * cannot be done for head updates under the above link-forgetting 208 * mechanics because an update may leave head at a detached node. 209 * And while direct writes are possible for tail updates, they 210 * increase the risk of long retraversals, and hence long garbage 211 * chains, which can be much more costly than is worthwhile 212 * considering that the cost difference of performing a CAS vs 213 * write is smaller when they are not triggered on each operation 214 * (especially considering that writes and CASes equally require 215 * additional GC bookkeeping ("write barriers") that are sometimes 216 * more costly than the writes themselves because of contention). 217 * 218 * *** Overview of implementation *** 219 * 220 * We use a threshold-based approach to updates, with a slack 221 * threshold of two -- that is, we update head/tail when the 222 * current pointer appears to be two or more steps away from the 223 * first/last node. The slack value is hard-wired: a path greater 224 * than one is naturally implemented by checking equality of 225 * traversal pointers except when the list has only one element, 226 * in which case we keep slack threshold at one. Avoiding tracking 227 * explicit counts across method calls slightly simplifies an 228 * already-messy implementation. Using randomization would 229 * probably work better if there were a low-quality dirt-cheap 230 * per-thread one available, but even ThreadLocalRandom is too 231 * heavy for these purposes. 232 * 233 * With such a small slack threshold value, it is not worthwhile 234 * to augment this with path short-circuiting (i.e., unsplicing 235 * interior nodes) except in the case of cancellation/removal (see 236 * below). 237 * 238 * We allow both the head and tail fields to be null before any 239 * nodes are enqueued; initializing upon first append. This 240 * simplifies some other logic, as well as providing more 241 * efficient explicit control paths instead of letting JVMs insert 242 * implicit NullPointerExceptions when they are null. While not 243 * currently fully implemented, we also leave open the possibility 244 * of re-nulling these fields when empty (which is complicated to 245 * arrange, for little benefit.) 246 * 247 * All enqueue/dequeue operations are handled by the single method 248 * "xfer" with parameters indicating whether to act as some form 249 * of offer, put, poll, take, or transfer (each possibly with 250 * timeout). The relative complexity of using one monolithic 251 * method outweighs the code bulk and maintenance problems of 252 * using separate methods for each case. 253 * 254 * Operation consists of up to three phases. The first is 255 * implemented within method xfer, the second in tryAppend, and 256 * the third in method awaitMatch. 257 * 258 * 1. Try to match an existing node 259 * 260 * Starting at head, skip already-matched nodes until finding 261 * an unmatched node of opposite mode, if one exists, in which 262 * case matching it and returning, also if necessary updating 263 * head to one past the matched node (or the node itself if the 264 * list has no other unmatched nodes). If the CAS misses, then 265 * a loop retries advancing head by two steps until either 266 * success or the slack is at most two. By requiring that each 267 * attempt advances head by two (if applicable), we ensure that 268 * the slack does not grow without bound. Traversals also check 269 * if the initial head is now off-list, in which case they 270 * start at the new head. 271 * 272 * If no candidates are found and the call was untimed 273 * poll/offer, (argument "how" is NOW) return. 274 * 275 * 2. Try to append a new node (method tryAppend) 276 * 277 * Starting at current tail pointer, find the actual last node 278 * and try to append a new node (or if head was null, establish 279 * the first node). Nodes can be appended only if their 280 * predecessors are either already matched or are of the same 281 * mode. If we detect otherwise, then a new node with opposite 282 * mode must have been appended during traversal, so we must 283 * restart at phase 1. The traversal and update steps are 284 * otherwise similar to phase 1: Retrying upon CAS misses and 285 * checking for staleness. In particular, if a self-link is 286 * encountered, then we can safely jump to a node on the list 287 * by continuing the traversal at current head. 288 * 289 * On successful append, if the call was ASYNC, return. 290 * 291 * 3. Await match or cancellation (method awaitMatch) 292 * 293 * Wait for another thread to match node; instead cancelling if 294 * the current thread was interrupted or the wait timed out. On 295 * multiprocessors, we use front-of-queue spinning: If a node 296 * appears to be the first unmatched node in the queue, it 297 * spins a bit before blocking. In either case, before blocking 298 * it tries to unsplice any nodes between the current "head" 299 * and the first unmatched node. 300 * 301 * Front-of-queue spinning vastly improves performance of 302 * heavily contended queues. And so long as it is relatively 303 * brief and "quiet", spinning does not much impact performance 304 * of less-contended queues. During spins threads check their 305 * interrupt status and generate a thread-local random number 306 * to decide to occasionally perform a Thread.yield. While 307 * yield has underdefined specs, we assume that it might help, 308 * and will not hurt, in limiting impact of spinning on busy 309 * systems. We also use smaller (1/2) spins for nodes that are 310 * not known to be front but whose predecessors have not 311 * blocked -- these "chained" spins avoid artifacts of 312 * front-of-queue rules which otherwise lead to alternating 313 * nodes spinning vs blocking. Further, front threads that 314 * represent phase changes (from data to request node or vice 315 * versa) compared to their predecessors receive additional 316 * chained spins, reflecting longer paths typically required to 317 * unblock threads during phase changes. 318 * 319 * 320 * ** Unlinking removed interior nodes ** 321 * 322 * In addition to minimizing garbage retention via self-linking 323 * described above, we also unlink removed interior nodes. These 324 * may arise due to timed out or interrupted waits, or calls to 325 * remove(x) or Iterator.remove. Normally, given a node that was 326 * at one time known to be the predecessor of some node s that is 327 * to be removed, we can unsplice s by CASing the next field of 328 * its predecessor if it still points to s (otherwise s must 329 * already have been removed or is now offlist). But there are two 330 * situations in which we cannot guarantee to make node s 331 * unreachable in this way: (1) If s is the trailing node of list 332 * (i.e., with null next), then it is pinned as the target node 333 * for appends, so can only be removed later after other nodes are 334 * appended. (2) We cannot necessarily unlink s given a 335 * predecessor node that is matched (including the case of being 336 * cancelled): the predecessor may already be unspliced, in which 337 * case some previous reachable node may still point to s. 338 * (For further explanation see Herlihy & Shavit "The Art of 339 * Multiprocessor Programming" chapter 9). Although, in both 340 * cases, we can rule out the need for further action if either s 341 * or its predecessor are (or can be made to be) at, or fall off 342 * from, the head of list. 343 * 344 * Without taking these into account, it would be possible for an 345 * unbounded number of supposedly removed nodes to remain 346 * reachable. Situations leading to such buildup are uncommon but 347 * can occur in practice; for example when a series of short timed 348 * calls to poll repeatedly time out but never otherwise fall off 349 * the list because of an untimed call to take at the front of the 350 * queue. 351 * 352 * When these cases arise, rather than always retraversing the 353 * entire list to find an actual predecessor to unlink (which 354 * won't help for case (1) anyway), we record a conservative 355 * estimate of possible unsplice failures (in "sweepVotes"). 356 * We trigger a full sweep when the estimate exceeds a threshold 357 * ("SWEEP_THRESHOLD") indicating the maximum number of estimated 358 * removal failures to tolerate before sweeping through, unlinking 359 * cancelled nodes that were not unlinked upon initial removal. 360 * We perform sweeps by the thread hitting threshold (rather than 361 * background threads or by spreading work to other threads) 362 * because in the main contexts in which removal occurs, the 363 * caller is already timed-out, cancelled, or performing a 364 * potentially O(n) operation (e.g. remove(x)), none of which are 365 * time-critical enough to warrant the overhead that alternatives 366 * would impose on other threads. 367 * 368 * Because the sweepVotes estimate is conservative, and because 369 * nodes become unlinked "naturally" as they fall off the head of 370 * the queue, and because we allow votes to accumulate even while 371 * sweeps are in progress, there are typically significantly fewer 372 * such nodes than estimated. Choice of a threshold value 373 * balances the likelihood of wasted effort and contention, versus 374 * providing a worst-case bound on retention of interior nodes in 375 * quiescent queues. The value defined below was chosen 376 * empirically to balance these under various timeout scenarios. 377 * 378 * Note that we cannot self-link unlinked interior nodes during 379 * sweeps. However, the associated garbage chains terminate when 380 * some successor ultimately falls off the head of the list and is 381 * self-linked. 382 */ 383 384 /** True if on multiprocessor */ 385 private static final boolean MP = 386 Runtime.getRuntime().availableProcessors() > 1; 387 388 /** 389 * The number of times to spin (with randomly interspersed calls 390 * to Thread.yield) on multiprocessor before blocking when a node 391 * is apparently the first waiter in the queue. See above for 392 * explanation. Must be a power of two. The value is empirically 393 * derived -- it works pretty well across a variety of processors, 394 * numbers of CPUs, and OSes. 395 */ 396 private static final int FRONT_SPINS = 1 << 7; 397 398 /** 399 * The number of times to spin before blocking when a node is 400 * preceded by another node that is apparently spinning. Also 401 * serves as an increment to FRONT_SPINS on phase changes, and as 402 * base average frequency for yielding during spins. Must be a 403 * power of two. 404 */ 405 private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; 406 407 /** 408 * The maximum number of estimated removal failures (sweepVotes) 409 * to tolerate before sweeping through the queue unlinking 410 * cancelled nodes that were not unlinked upon initial 411 * removal. See above for explanation. The value must be at least 412 * two to avoid useless sweeps when removing trailing nodes. 413 */ 414 static final int SWEEP_THRESHOLD = 32; 415 416 /** 417 * Queue nodes. Uses Object, not E, for items to allow forgetting 418 * them after use. Relies heavily on Unsafe mechanics to minimize 419 * unnecessary ordering constraints: Writes that are intrinsically 420 * ordered wrt other accesses or CASes use simple relaxed forms. 421 */ 422 static final class Node { 423 final boolean isData; // false if this is a request node 424 volatile Object item; // initially non-null if isData; CASed to match 425 volatile Node next; 426 volatile Thread waiter; // null until waiting 427 428 // CAS methods for fields casNext(Node cmp, Node val)429 final boolean casNext(Node cmp, Node val) { 430 return U.compareAndSwapObject(this, NEXT, cmp, val); 431 } 432 casItem(Object cmp, Object val)433 final boolean casItem(Object cmp, Object val) { 434 // assert cmp == null || cmp.getClass() != Node.class; 435 return U.compareAndSwapObject(this, ITEM, cmp, val); 436 } 437 438 /** 439 * Constructs a new node. Uses relaxed write because item can 440 * only be seen after publication via casNext. 441 */ Node(Object item, boolean isData)442 Node(Object item, boolean isData) { 443 U.putObject(this, ITEM, item); // relaxed write 444 this.isData = isData; 445 } 446 447 /** 448 * Links node to itself to avoid garbage retention. Called 449 * only after CASing head field, so uses relaxed write. 450 */ forgetNext()451 final void forgetNext() { 452 U.putObject(this, NEXT, this); 453 } 454 455 /** 456 * Sets item to self and waiter to null, to avoid garbage 457 * retention after matching or cancelling. Uses relaxed writes 458 * because order is already constrained in the only calling 459 * contexts: item is forgotten only after volatile/atomic 460 * mechanics that extract items. Similarly, clearing waiter 461 * follows either CAS or return from park (if ever parked; 462 * else we don't care). 463 */ forgetContents()464 final void forgetContents() { 465 U.putObject(this, ITEM, this); 466 U.putObject(this, WAITER, null); 467 } 468 469 /** 470 * Returns true if this node has been matched, including the 471 * case of artificial matches due to cancellation. 472 */ isMatched()473 final boolean isMatched() { 474 Object x = item; 475 return (x == this) || ((x == null) == isData); 476 } 477 478 /** 479 * Returns true if this is an unmatched request node. 480 */ isUnmatchedRequest()481 final boolean isUnmatchedRequest() { 482 return !isData && item == null; 483 } 484 485 /** 486 * Returns true if a node with the given mode cannot be 487 * appended to this node because this node is unmatched and 488 * has opposite data mode. 489 */ cannotPrecede(boolean haveData)490 final boolean cannotPrecede(boolean haveData) { 491 boolean d = isData; 492 Object x; 493 return d != haveData && (x = item) != this && (x != null) == d; 494 } 495 496 /** 497 * Tries to artificially match a data node -- used by remove. 498 */ tryMatchData()499 final boolean tryMatchData() { 500 // assert isData; 501 Object x = item; 502 if (x != null && x != this && casItem(x, null)) { 503 LockSupport.unpark(waiter); 504 return true; 505 } 506 return false; 507 } 508 509 private static final long serialVersionUID = -3375979862319811754L; 510 511 // Unsafe mechanics 512 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); 513 private static final long ITEM; 514 private static final long NEXT; 515 private static final long WAITER; 516 static { 517 try { 518 ITEM = U.objectFieldOffset 519 (Node.class.getDeclaredField("item")); 520 NEXT = U.objectFieldOffset 521 (Node.class.getDeclaredField("next")); 522 WAITER = U.objectFieldOffset 523 (Node.class.getDeclaredField("waiter")); 524 } catch (ReflectiveOperationException e) { 525 throw new Error(e); 526 } 527 } 528 } 529 530 /** head of the queue; null until first enqueue */ 531 transient volatile Node head; 532 533 /** tail of the queue; null until first append */ 534 private transient volatile Node tail; 535 536 /** The number of apparent failures to unsplice removed nodes */ 537 private transient volatile int sweepVotes; 538 539 // CAS methods for fields casTail(Node cmp, Node val)540 private boolean casTail(Node cmp, Node val) { 541 return U.compareAndSwapObject(this, TAIL, cmp, val); 542 } 543 casHead(Node cmp, Node val)544 private boolean casHead(Node cmp, Node val) { 545 return U.compareAndSwapObject(this, HEAD, cmp, val); 546 } 547 casSweepVotes(int cmp, int val)548 private boolean casSweepVotes(int cmp, int val) { 549 return U.compareAndSwapInt(this, SWEEPVOTES, cmp, val); 550 } 551 552 /* 553 * Possible values for "how" argument in xfer method. 554 */ 555 private static final int NOW = 0; // for untimed poll, tryTransfer 556 private static final int ASYNC = 1; // for offer, put, add 557 private static final int SYNC = 2; // for transfer, take 558 private static final int TIMED = 3; // for timed poll, tryTransfer 559 560 /** 561 * Implements all queuing methods. See above for explanation. 562 * 563 * @param e the item or null for take 564 * @param haveData true if this is a put, else a take 565 * @param how NOW, ASYNC, SYNC, or TIMED 566 * @param nanos timeout in nanosecs, used only if mode is TIMED 567 * @return an item if matched, else e 568 * @throws NullPointerException if haveData mode but e is null 569 */ xfer(E e, boolean haveData, int how, long nanos)570 private E xfer(E e, boolean haveData, int how, long nanos) { 571 if (haveData && (e == null)) 572 throw new NullPointerException(); 573 Node s = null; // the node to append, if needed 574 575 retry: 576 for (;;) { // restart on append race 577 578 for (Node h = head, p = h; p != null;) { // find & match first node 579 boolean isData = p.isData; 580 Object item = p.item; 581 if (item != p && (item != null) == isData) { // unmatched 582 if (isData == haveData) // can't match 583 break; 584 if (p.casItem(item, e)) { // match 585 for (Node q = p; q != h;) { 586 Node n = q.next; // update by 2 unless singleton 587 if (head == h && casHead(h, n == null ? q : n)) { 588 h.forgetNext(); 589 break; 590 } // advance and retry 591 if ((h = head) == null || 592 (q = h.next) == null || !q.isMatched()) 593 break; // unless slack < 2 594 } 595 LockSupport.unpark(p.waiter); 596 @SuppressWarnings("unchecked") E itemE = (E) item; 597 return itemE; 598 } 599 } 600 Node n = p.next; 601 p = (p != n) ? n : (h = head); // Use head if p offlist 602 } 603 604 if (how != NOW) { // No matches available 605 if (s == null) 606 s = new Node(e, haveData); 607 Node pred = tryAppend(s, haveData); 608 if (pred == null) 609 continue retry; // lost race vs opposite mode 610 if (how != ASYNC) 611 return awaitMatch(s, pred, e, (how == TIMED), nanos); 612 } 613 return e; // not waiting 614 } 615 } 616 617 /** 618 * Tries to append node s as tail. 619 * 620 * @param s the node to append 621 * @param haveData true if appending in data mode 622 * @return null on failure due to losing race with append in 623 * different mode, else s's predecessor, or s itself if no 624 * predecessor 625 */ tryAppend(Node s, boolean haveData)626 private Node tryAppend(Node s, boolean haveData) { 627 for (Node t = tail, p = t;;) { // move p to last node and append 628 Node n, u; // temps for reads of next & tail 629 if (p == null && (p = head) == null) { 630 if (casHead(null, s)) 631 return s; // initialize 632 } 633 else if (p.cannotPrecede(haveData)) 634 return null; // lost race vs opposite mode 635 else if ((n = p.next) != null) // not last; keep traversing 636 p = p != t && t != (u = tail) ? (t = u) : // stale tail 637 (p != n) ? n : null; // restart if off list 638 else if (!p.casNext(null, s)) 639 p = p.next; // re-read on CAS failure 640 else { 641 if (p != t) { // update if slack now >= 2 642 while ((tail != t || !casTail(t, s)) && 643 (t = tail) != null && 644 (s = t.next) != null && // advance and retry 645 (s = s.next) != null && s != t); 646 } 647 return p; 648 } 649 } 650 } 651 652 /** 653 * Spins/yields/blocks until node s is matched or caller gives up. 654 * 655 * @param s the waiting node 656 * @param pred the predecessor of s, or s itself if it has no 657 * predecessor, or null if unknown (the null case does not occur 658 * in any current calls but may in possible future extensions) 659 * @param e the comparison value for checking match 660 * @param timed if true, wait only until timeout elapses 661 * @param nanos timeout in nanosecs, used only if timed is true 662 * @return matched item, or e if unmatched on interrupt or timeout 663 */ awaitMatch(Node s, Node pred, E e, boolean timed, long nanos)664 private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { 665 final long deadline = timed ? System.nanoTime() + nanos : 0L; 666 Thread w = Thread.currentThread(); 667 int spins = -1; // initialized after first item and cancel checks 668 ThreadLocalRandom randomYields = null; // bound if needed 669 670 for (;;) { 671 Object item = s.item; 672 if (item != e) { // matched 673 // assert item != s; 674 s.forgetContents(); // avoid garbage 675 @SuppressWarnings("unchecked") E itemE = (E) item; 676 return itemE; 677 } 678 else if (w.isInterrupted() || (timed && nanos <= 0L)) { 679 unsplice(pred, s); // try to unlink and cancel 680 if (s.casItem(e, s)) // return normally if lost CAS 681 return e; 682 } 683 else if (spins < 0) { // establish spins at/near front 684 if ((spins = spinsFor(pred, s.isData)) > 0) 685 randomYields = ThreadLocalRandom.current(); 686 } 687 else if (spins > 0) { // spin 688 --spins; 689 if (randomYields.nextInt(CHAINED_SPINS) == 0) 690 Thread.yield(); // occasionally yield 691 } 692 else if (s.waiter == null) { 693 s.waiter = w; // request unpark then recheck 694 } 695 else if (timed) { 696 nanos = deadline - System.nanoTime(); 697 if (nanos > 0L) 698 LockSupport.parkNanos(this, nanos); 699 } 700 else { 701 LockSupport.park(this); 702 } 703 } 704 } 705 706 /** 707 * Returns spin/yield value for a node with given predecessor and 708 * data mode. See above for explanation. 709 */ spinsFor(Node pred, boolean haveData)710 private static int spinsFor(Node pred, boolean haveData) { 711 if (MP && pred != null) { 712 if (pred.isData != haveData) // phase change 713 return FRONT_SPINS + CHAINED_SPINS; 714 if (pred.isMatched()) // probably at front 715 return FRONT_SPINS; 716 if (pred.waiter == null) // pred apparently spinning 717 return CHAINED_SPINS; 718 } 719 return 0; 720 } 721 722 /* -------------- Traversal methods -------------- */ 723 724 /** 725 * Returns the successor of p, or the head node if p.next has been 726 * linked to self, which will only be true if traversing with a 727 * stale pointer that is now off the list. 728 */ succ(Node p)729 final Node succ(Node p) { 730 Node next = p.next; 731 return (p == next) ? head : next; 732 } 733 734 /** 735 * Returns the first unmatched data node, or null if none. 736 * Callers must recheck if the returned node's item field is null 737 * or self-linked before using. 738 */ firstDataNode()739 final Node firstDataNode() { 740 restartFromHead: for (;;) { 741 for (Node p = head; p != null;) { 742 Object item = p.item; 743 if (p.isData) { 744 if (item != null && item != p) 745 return p; 746 } 747 else if (item == null) 748 break; 749 if (p == (p = p.next)) 750 continue restartFromHead; 751 } 752 return null; 753 } 754 } 755 756 /** 757 * Traverses and counts unmatched nodes of the given mode. 758 * Used by methods size and getWaitingConsumerCount. 759 */ countOfMode(boolean data)760 private int countOfMode(boolean data) { 761 restartFromHead: for (;;) { 762 int count = 0; 763 for (Node p = head; p != null;) { 764 if (!p.isMatched()) { 765 if (p.isData != data) 766 return 0; 767 if (++count == Integer.MAX_VALUE) 768 break; // @see Collection.size() 769 } 770 if (p == (p = p.next)) 771 continue restartFromHead; 772 } 773 return count; 774 } 775 } 776 toString()777 public String toString() { 778 String[] a = null; 779 restartFromHead: for (;;) { 780 int charLength = 0; 781 int size = 0; 782 for (Node p = head; p != null;) { 783 Object item = p.item; 784 if (p.isData) { 785 if (item != null && item != p) { 786 if (a == null) 787 a = new String[4]; 788 else if (size == a.length) 789 a = Arrays.copyOf(a, 2 * size); 790 String s = item.toString(); 791 a[size++] = s; 792 charLength += s.length(); 793 } 794 } else if (item == null) 795 break; 796 if (p == (p = p.next)) 797 continue restartFromHead; 798 } 799 800 if (size == 0) 801 return "[]"; 802 803 return Helpers.toString(a, size, charLength); 804 } 805 } 806 toArrayInternal(Object[] a)807 private Object[] toArrayInternal(Object[] a) { 808 Object[] x = a; 809 restartFromHead: for (;;) { 810 int size = 0; 811 for (Node p = head; p != null;) { 812 Object item = p.item; 813 if (p.isData) { 814 if (item != null && item != p) { 815 if (x == null) 816 x = new Object[4]; 817 else if (size == x.length) 818 x = Arrays.copyOf(x, 2 * (size + 4)); 819 x[size++] = item; 820 } 821 } else if (item == null) 822 break; 823 if (p == (p = p.next)) 824 continue restartFromHead; 825 } 826 if (x == null) 827 return new Object[0]; 828 else if (a != null && size <= a.length) { 829 if (a != x) 830 System.arraycopy(x, 0, a, 0, size); 831 if (size < a.length) 832 a[size] = null; 833 return a; 834 } 835 return (size == x.length) ? x : Arrays.copyOf(x, size); 836 } 837 } 838 839 /** 840 * Returns an array containing all of the elements in this queue, in 841 * proper sequence. 842 * 843 * <p>The returned array will be "safe" in that no references to it are 844 * maintained by this queue. (In other words, this method must allocate 845 * a new array). The caller is thus free to modify the returned array. 846 * 847 * <p>This method acts as bridge between array-based and collection-based 848 * APIs. 849 * 850 * @return an array containing all of the elements in this queue 851 */ toArray()852 public Object[] toArray() { 853 return toArrayInternal(null); 854 } 855 856 /** 857 * Returns an array containing all of the elements in this queue, in 858 * proper sequence; the runtime type of the returned array is that of 859 * the specified array. If the queue fits in the specified array, it 860 * is returned therein. Otherwise, a new array is allocated with the 861 * runtime type of the specified array and the size of this queue. 862 * 863 * <p>If this queue fits in the specified array with room to spare 864 * (i.e., the array has more elements than this queue), the element in 865 * the array immediately following the end of the queue is set to 866 * {@code null}. 867 * 868 * <p>Like the {@link #toArray()} method, this method acts as bridge between 869 * array-based and collection-based APIs. Further, this method allows 870 * precise control over the runtime type of the output array, and may, 871 * under certain circumstances, be used to save allocation costs. 872 * 873 * <p>Suppose {@code x} is a queue known to contain only strings. 874 * The following code can be used to dump the queue into a newly 875 * allocated array of {@code String}: 876 * 877 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 878 * 879 * Note that {@code toArray(new Object[0])} is identical in function to 880 * {@code toArray()}. 881 * 882 * @param a the array into which the elements of the queue are to 883 * be stored, if it is big enough; otherwise, a new array of the 884 * same runtime type is allocated for this purpose 885 * @return an array containing all of the elements in this queue 886 * @throws ArrayStoreException if the runtime type of the specified array 887 * is not a supertype of the runtime type of every element in 888 * this queue 889 * @throws NullPointerException if the specified array is null 890 */ 891 @SuppressWarnings("unchecked") toArray(T[] a)892 public <T> T[] toArray(T[] a) { 893 if (a == null) throw new NullPointerException(); 894 return (T[]) toArrayInternal(a); 895 } 896 897 final class Itr implements Iterator<E> { 898 private Node nextNode; // next node to return item for 899 private E nextItem; // the corresponding item 900 private Node lastRet; // last returned node, to support remove 901 private Node lastPred; // predecessor to unlink lastRet 902 903 /** 904 * Moves to next node after prev, or first node if prev null. 905 */ advance(Node prev)906 private void advance(Node prev) { 907 /* 908 * To track and avoid buildup of deleted nodes in the face 909 * of calls to both Queue.remove and Itr.remove, we must 910 * include variants of unsplice and sweep upon each 911 * advance: Upon Itr.remove, we may need to catch up links 912 * from lastPred, and upon other removes, we might need to 913 * skip ahead from stale nodes and unsplice deleted ones 914 * found while advancing. 915 */ 916 917 Node r, b; // reset lastPred upon possible deletion of lastRet 918 if ((r = lastRet) != null && !r.isMatched()) 919 lastPred = r; // next lastPred is old lastRet 920 else if ((b = lastPred) == null || b.isMatched()) 921 lastPred = null; // at start of list 922 else { 923 Node s, n; // help with removal of lastPred.next 924 while ((s = b.next) != null && 925 s != b && s.isMatched() && 926 (n = s.next) != null && n != s) 927 b.casNext(s, n); 928 } 929 930 this.lastRet = prev; 931 932 for (Node p = prev, s, n;;) { 933 s = (p == null) ? head : p.next; 934 if (s == null) 935 break; 936 else if (s == p) { 937 p = null; 938 continue; 939 } 940 Object item = s.item; 941 if (s.isData) { 942 if (item != null && item != s) { 943 @SuppressWarnings("unchecked") E itemE = (E) item; 944 nextItem = itemE; 945 nextNode = s; 946 return; 947 } 948 } 949 else if (item == null) 950 break; 951 // assert s.isMatched(); 952 if (p == null) 953 p = s; 954 else if ((n = s.next) == null) 955 break; 956 else if (s == n) 957 p = null; 958 else 959 p.casNext(s, n); 960 } 961 nextNode = null; 962 nextItem = null; 963 } 964 Itr()965 Itr() { 966 advance(null); 967 } 968 hasNext()969 public final boolean hasNext() { 970 return nextNode != null; 971 } 972 next()973 public final E next() { 974 Node p = nextNode; 975 if (p == null) throw new NoSuchElementException(); 976 E e = nextItem; 977 advance(p); 978 return e; 979 } 980 remove()981 public final void remove() { 982 final Node lastRet = this.lastRet; 983 if (lastRet == null) 984 throw new IllegalStateException(); 985 this.lastRet = null; 986 if (lastRet.tryMatchData()) 987 unsplice(lastPred, lastRet); 988 } 989 } 990 991 /** A customized variant of Spliterators.IteratorSpliterator */ 992 final class LTQSpliterator<E> implements Spliterator<E> { 993 static final int MAX_BATCH = 1 << 25; // max batch array size; 994 Node current; // current node; null until initialized 995 int batch; // batch size for splits 996 boolean exhausted; // true when no more nodes LTQSpliterator()997 LTQSpliterator() {} 998 trySplit()999 public Spliterator<E> trySplit() { 1000 Node p; 1001 int b = batch; 1002 int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; 1003 if (!exhausted && 1004 ((p = current) != null || (p = firstDataNode()) != null) && 1005 p.next != null) { 1006 Object[] a = new Object[n]; 1007 int i = 0; 1008 do { 1009 Object e = p.item; 1010 if (e != p && (a[i] = e) != null) 1011 ++i; 1012 if (p == (p = p.next)) 1013 p = firstDataNode(); 1014 } while (p != null && i < n && p.isData); 1015 if ((current = p) == null) 1016 exhausted = true; 1017 if (i > 0) { 1018 batch = i; 1019 return Spliterators.spliterator 1020 (a, 0, i, (Spliterator.ORDERED | 1021 Spliterator.NONNULL | 1022 Spliterator.CONCURRENT)); 1023 } 1024 } 1025 return null; 1026 } 1027 1028 @SuppressWarnings("unchecked") forEachRemaining(Consumer<? super E> action)1029 public void forEachRemaining(Consumer<? super E> action) { 1030 Node p; 1031 if (action == null) throw new NullPointerException(); 1032 if (!exhausted && 1033 ((p = current) != null || (p = firstDataNode()) != null)) { 1034 exhausted = true; 1035 do { 1036 Object e = p.item; 1037 if (e != null && e != p) 1038 action.accept((E)e); 1039 if (p == (p = p.next)) 1040 p = firstDataNode(); 1041 } while (p != null && p.isData); 1042 } 1043 } 1044 1045 @SuppressWarnings("unchecked") tryAdvance(Consumer<? super E> action)1046 public boolean tryAdvance(Consumer<? super E> action) { 1047 Node p; 1048 if (action == null) throw new NullPointerException(); 1049 if (!exhausted && 1050 ((p = current) != null || (p = firstDataNode()) != null)) { 1051 Object e; 1052 do { 1053 if ((e = p.item) == p) 1054 e = null; 1055 if (p == (p = p.next)) 1056 p = firstDataNode(); 1057 } while (e == null && p != null && p.isData); 1058 if ((current = p) == null) 1059 exhausted = true; 1060 if (e != null) { 1061 action.accept((E)e); 1062 return true; 1063 } 1064 } 1065 return false; 1066 } 1067 estimateSize()1068 public long estimateSize() { return Long.MAX_VALUE; } 1069 characteristics()1070 public int characteristics() { 1071 return Spliterator.ORDERED | Spliterator.NONNULL | 1072 Spliterator.CONCURRENT; 1073 } 1074 } 1075 1076 /** 1077 * Returns a {@link Spliterator} over the elements in this queue. 1078 * 1079 * <p>The returned spliterator is 1080 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 1081 * 1082 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT}, 1083 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}. 1084 * 1085 * @implNote 1086 * The {@code Spliterator} implements {@code trySplit} to permit limited 1087 * parallelism. 1088 * 1089 * @return a {@code Spliterator} over the elements in this queue 1090 * @since 1.8 1091 */ spliterator()1092 public Spliterator<E> spliterator() { 1093 return new LTQSpliterator<E>(); 1094 } 1095 1096 /* -------------- Removal methods -------------- */ 1097 1098 /** 1099 * Unsplices (now or later) the given deleted/cancelled node with 1100 * the given predecessor. 1101 * 1102 * @param pred a node that was at one time known to be the 1103 * predecessor of s, or null or s itself if s is/was at head 1104 * @param s the node to be unspliced 1105 */ unsplice(Node pred, Node s)1106 final void unsplice(Node pred, Node s) { 1107 s.waiter = null; // disable signals 1108 /* 1109 * See above for rationale. Briefly: if pred still points to 1110 * s, try to unlink s. If s cannot be unlinked, because it is 1111 * trailing node or pred might be unlinked, and neither pred 1112 * nor s are head or offlist, add to sweepVotes, and if enough 1113 * votes have accumulated, sweep. 1114 */ 1115 if (pred != null && pred != s && pred.next == s) { 1116 Node n = s.next; 1117 if (n == null || 1118 (n != s && pred.casNext(s, n) && pred.isMatched())) { 1119 for (;;) { // check if at, or could be, head 1120 Node h = head; 1121 if (h == pred || h == s || h == null) 1122 return; // at head or list empty 1123 if (!h.isMatched()) 1124 break; 1125 Node hn = h.next; 1126 if (hn == null) 1127 return; // now empty 1128 if (hn != h && casHead(h, hn)) 1129 h.forgetNext(); // advance head 1130 } 1131 if (pred.next != pred && s.next != s) { // recheck if offlist 1132 for (;;) { // sweep now if enough votes 1133 int v = sweepVotes; 1134 if (v < SWEEP_THRESHOLD) { 1135 if (casSweepVotes(v, v + 1)) 1136 break; 1137 } 1138 else if (casSweepVotes(v, 0)) { 1139 sweep(); 1140 break; 1141 } 1142 } 1143 } 1144 } 1145 } 1146 } 1147 1148 /** 1149 * Unlinks matched (typically cancelled) nodes encountered in a 1150 * traversal from head. 1151 */ sweep()1152 private void sweep() { 1153 for (Node p = head, s, n; p != null && (s = p.next) != null; ) { 1154 if (!s.isMatched()) 1155 // Unmatched nodes are never self-linked 1156 p = s; 1157 else if ((n = s.next) == null) // trailing node is pinned 1158 break; 1159 else if (s == n) // stale 1160 // No need to also check for p == s, since that implies s == n 1161 p = head; 1162 else 1163 p.casNext(s, n); 1164 } 1165 } 1166 1167 /** 1168 * Main implementation of remove(Object) 1169 */ findAndRemove(Object e)1170 private boolean findAndRemove(Object e) { 1171 if (e != null) { 1172 for (Node pred = null, p = head; p != null; ) { 1173 Object item = p.item; 1174 if (p.isData) { 1175 if (item != null && item != p && e.equals(item) && 1176 p.tryMatchData()) { 1177 unsplice(pred, p); 1178 return true; 1179 } 1180 } 1181 else if (item == null) 1182 break; 1183 pred = p; 1184 if ((p = p.next) == pred) { // stale 1185 pred = null; 1186 p = head; 1187 } 1188 } 1189 } 1190 return false; 1191 } 1192 1193 /** 1194 * Creates an initially empty {@code LinkedTransferQueue}. 1195 */ LinkedTransferQueue()1196 public LinkedTransferQueue() { 1197 } 1198 1199 /** 1200 * Creates a {@code LinkedTransferQueue} 1201 * initially containing the elements of the given collection, 1202 * added in traversal order of the collection's iterator. 1203 * 1204 * @param c the collection of elements to initially contain 1205 * @throws NullPointerException if the specified collection or any 1206 * of its elements are null 1207 */ LinkedTransferQueue(Collection<? extends E> c)1208 public LinkedTransferQueue(Collection<? extends E> c) { 1209 this(); 1210 addAll(c); 1211 } 1212 1213 /** 1214 * Inserts the specified element at the tail of this queue. 1215 * As the queue is unbounded, this method will never block. 1216 * 1217 * @throws NullPointerException if the specified element is null 1218 */ put(E e)1219 public void put(E e) { 1220 xfer(e, true, ASYNC, 0); 1221 } 1222 1223 /** 1224 * Inserts the specified element at the tail of this queue. 1225 * As the queue is unbounded, this method will never block or 1226 * return {@code false}. 1227 * 1228 * @return {@code true} (as specified by 1229 * {@link java.util.concurrent.BlockingQueue#offer(Object,long,TimeUnit) 1230 * BlockingQueue.offer}) 1231 * @throws NullPointerException if the specified element is null 1232 */ offer(E e, long timeout, TimeUnit unit)1233 public boolean offer(E e, long timeout, TimeUnit unit) { 1234 xfer(e, true, ASYNC, 0); 1235 return true; 1236 } 1237 1238 /** 1239 * Inserts the specified element at the tail of this queue. 1240 * As the queue is unbounded, this method will never return {@code false}. 1241 * 1242 * @return {@code true} (as specified by {@link Queue#offer}) 1243 * @throws NullPointerException if the specified element is null 1244 */ offer(E e)1245 public boolean offer(E e) { 1246 xfer(e, true, ASYNC, 0); 1247 return true; 1248 } 1249 1250 /** 1251 * Inserts the specified element at the tail of this queue. 1252 * As the queue is unbounded, this method will never throw 1253 * {@link IllegalStateException} or return {@code false}. 1254 * 1255 * @return {@code true} (as specified by {@link Collection#add}) 1256 * @throws NullPointerException if the specified element is null 1257 */ add(E e)1258 public boolean add(E e) { 1259 xfer(e, true, ASYNC, 0); 1260 return true; 1261 } 1262 1263 /** 1264 * Transfers the element to a waiting consumer immediately, if possible. 1265 * 1266 * <p>More precisely, transfers the specified element immediately 1267 * if there exists a consumer already waiting to receive it (in 1268 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), 1269 * otherwise returning {@code false} without enqueuing the element. 1270 * 1271 * @throws NullPointerException if the specified element is null 1272 */ tryTransfer(E e)1273 public boolean tryTransfer(E e) { 1274 return xfer(e, true, NOW, 0) == null; 1275 } 1276 1277 /** 1278 * Transfers the element to a consumer, waiting if necessary to do so. 1279 * 1280 * <p>More precisely, transfers the specified element immediately 1281 * if there exists a consumer already waiting to receive it (in 1282 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), 1283 * else inserts the specified element at the tail of this queue 1284 * and waits until the element is received by a consumer. 1285 * 1286 * @throws NullPointerException if the specified element is null 1287 */ transfer(E e)1288 public void transfer(E e) throws InterruptedException { 1289 if (xfer(e, true, SYNC, 0) != null) { 1290 Thread.interrupted(); // failure possible only due to interrupt 1291 throw new InterruptedException(); 1292 } 1293 } 1294 1295 /** 1296 * Transfers the element to a consumer if it is possible to do so 1297 * before the timeout elapses. 1298 * 1299 * <p>More precisely, transfers the specified element immediately 1300 * if there exists a consumer already waiting to receive it (in 1301 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), 1302 * else inserts the specified element at the tail of this queue 1303 * and waits until the element is received by a consumer, 1304 * returning {@code false} if the specified wait time elapses 1305 * before the element can be transferred. 1306 * 1307 * @throws NullPointerException if the specified element is null 1308 */ tryTransfer(E e, long timeout, TimeUnit unit)1309 public boolean tryTransfer(E e, long timeout, TimeUnit unit) 1310 throws InterruptedException { 1311 if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) 1312 return true; 1313 if (!Thread.interrupted()) 1314 return false; 1315 throw new InterruptedException(); 1316 } 1317 take()1318 public E take() throws InterruptedException { 1319 E e = xfer(null, false, SYNC, 0); 1320 if (e != null) 1321 return e; 1322 Thread.interrupted(); 1323 throw new InterruptedException(); 1324 } 1325 poll(long timeout, TimeUnit unit)1326 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 1327 E e = xfer(null, false, TIMED, unit.toNanos(timeout)); 1328 if (e != null || !Thread.interrupted()) 1329 return e; 1330 throw new InterruptedException(); 1331 } 1332 poll()1333 public E poll() { 1334 return xfer(null, false, NOW, 0); 1335 } 1336 1337 /** 1338 * @throws NullPointerException {@inheritDoc} 1339 * @throws IllegalArgumentException {@inheritDoc} 1340 */ drainTo(Collection<? super E> c)1341 public int drainTo(Collection<? super E> c) { 1342 if (c == null) 1343 throw new NullPointerException(); 1344 if (c == this) 1345 throw new IllegalArgumentException(); 1346 int n = 0; 1347 for (E e; (e = poll()) != null;) { 1348 c.add(e); 1349 ++n; 1350 } 1351 return n; 1352 } 1353 1354 /** 1355 * @throws NullPointerException {@inheritDoc} 1356 * @throws IllegalArgumentException {@inheritDoc} 1357 */ drainTo(Collection<? super E> c, int maxElements)1358 public int drainTo(Collection<? super E> c, int maxElements) { 1359 if (c == null) 1360 throw new NullPointerException(); 1361 if (c == this) 1362 throw new IllegalArgumentException(); 1363 int n = 0; 1364 for (E e; n < maxElements && (e = poll()) != null;) { 1365 c.add(e); 1366 ++n; 1367 } 1368 return n; 1369 } 1370 1371 /** 1372 * Returns an iterator over the elements in this queue in proper sequence. 1373 * The elements will be returned in order from first (head) to last (tail). 1374 * 1375 * <p>The returned iterator is 1376 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 1377 * 1378 * @return an iterator over the elements in this queue in proper sequence 1379 */ iterator()1380 public Iterator<E> iterator() { 1381 return new Itr(); 1382 } 1383 peek()1384 public E peek() { 1385 restartFromHead: for (;;) { 1386 for (Node p = head; p != null;) { 1387 Object item = p.item; 1388 if (p.isData) { 1389 if (item != null && item != p) { 1390 @SuppressWarnings("unchecked") E e = (E) item; 1391 return e; 1392 } 1393 } 1394 else if (item == null) 1395 break; 1396 if (p == (p = p.next)) 1397 continue restartFromHead; 1398 } 1399 return null; 1400 } 1401 } 1402 1403 /** 1404 * Returns {@code true} if this queue contains no elements. 1405 * 1406 * @return {@code true} if this queue contains no elements 1407 */ isEmpty()1408 public boolean isEmpty() { 1409 return firstDataNode() == null; 1410 } 1411 hasWaitingConsumer()1412 public boolean hasWaitingConsumer() { 1413 restartFromHead: for (;;) { 1414 for (Node p = head; p != null;) { 1415 Object item = p.item; 1416 if (p.isData) { 1417 if (item != null && item != p) 1418 break; 1419 } 1420 else if (item == null) 1421 return true; 1422 if (p == (p = p.next)) 1423 continue restartFromHead; 1424 } 1425 return false; 1426 } 1427 } 1428 1429 /** 1430 * Returns the number of elements in this queue. If this queue 1431 * contains more than {@code Integer.MAX_VALUE} elements, returns 1432 * {@code Integer.MAX_VALUE}. 1433 * 1434 * <p>Beware that, unlike in most collections, this method is 1435 * <em>NOT</em> a constant-time operation. Because of the 1436 * asynchronous nature of these queues, determining the current 1437 * number of elements requires an O(n) traversal. 1438 * 1439 * @return the number of elements in this queue 1440 */ size()1441 public int size() { 1442 return countOfMode(true); 1443 } 1444 getWaitingConsumerCount()1445 public int getWaitingConsumerCount() { 1446 return countOfMode(false); 1447 } 1448 1449 /** 1450 * Removes a single instance of the specified element from this queue, 1451 * if it is present. More formally, removes an element {@code e} such 1452 * that {@code o.equals(e)}, if this queue contains one or more such 1453 * elements. 1454 * Returns {@code true} if this queue contained the specified element 1455 * (or equivalently, if this queue changed as a result of the call). 1456 * 1457 * @param o element to be removed from this queue, if present 1458 * @return {@code true} if this queue changed as a result of the call 1459 */ remove(Object o)1460 public boolean remove(Object o) { 1461 return findAndRemove(o); 1462 } 1463 1464 /** 1465 * Returns {@code true} if this queue contains the specified element. 1466 * More formally, returns {@code true} if and only if this queue contains 1467 * at least one element {@code e} such that {@code o.equals(e)}. 1468 * 1469 * @param o object to be checked for containment in this queue 1470 * @return {@code true} if this queue contains the specified element 1471 */ contains(Object o)1472 public boolean contains(Object o) { 1473 if (o != null) { 1474 for (Node p = head; p != null; p = succ(p)) { 1475 Object item = p.item; 1476 if (p.isData) { 1477 if (item != null && item != p && o.equals(item)) 1478 return true; 1479 } 1480 else if (item == null) 1481 break; 1482 } 1483 } 1484 return false; 1485 } 1486 1487 /** 1488 * Always returns {@code Integer.MAX_VALUE} because a 1489 * {@code LinkedTransferQueue} is not capacity constrained. 1490 * 1491 * @return {@code Integer.MAX_VALUE} (as specified by 1492 * {@link java.util.concurrent.BlockingQueue#remainingCapacity() 1493 * BlockingQueue.remainingCapacity}) 1494 */ remainingCapacity()1495 public int remainingCapacity() { 1496 return Integer.MAX_VALUE; 1497 } 1498 1499 /** 1500 * Saves this queue to a stream (that is, serializes it). 1501 * 1502 * @param s the stream 1503 * @throws java.io.IOException if an I/O error occurs 1504 * @serialData All of the elements (each an {@code E}) in 1505 * the proper order, followed by a null 1506 */ writeObject(java.io.ObjectOutputStream s)1507 private void writeObject(java.io.ObjectOutputStream s) 1508 throws java.io.IOException { 1509 s.defaultWriteObject(); 1510 for (E e : this) 1511 s.writeObject(e); 1512 // Use trailing null as sentinel 1513 s.writeObject(null); 1514 } 1515 1516 /** 1517 * Reconstitutes this queue from a stream (that is, deserializes it). 1518 * @param s the stream 1519 * @throws ClassNotFoundException if the class of a serialized object 1520 * could not be found 1521 * @throws java.io.IOException if an I/O error occurs 1522 */ readObject(java.io.ObjectInputStream s)1523 private void readObject(java.io.ObjectInputStream s) 1524 throws java.io.IOException, ClassNotFoundException { 1525 s.defaultReadObject(); 1526 for (;;) { 1527 @SuppressWarnings("unchecked") 1528 E item = (E) s.readObject(); 1529 if (item == null) 1530 break; 1531 else 1532 offer(item); 1533 } 1534 } 1535 1536 // Unsafe mechanics 1537 1538 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); 1539 private static final long HEAD; 1540 private static final long TAIL; 1541 private static final long SWEEPVOTES; 1542 static { 1543 try { 1544 HEAD = U.objectFieldOffset 1545 (LinkedTransferQueue.class.getDeclaredField("head")); 1546 TAIL = U.objectFieldOffset 1547 (LinkedTransferQueue.class.getDeclaredField("tail")); 1548 SWEEPVOTES = U.objectFieldOffset 1549 (LinkedTransferQueue.class.getDeclaredField("sweepVotes")); 1550 } catch (ReflectiveOperationException e) { 1551 throw new Error(e); 1552 } 1553 1554 // Reduce the risk of rare disastrous classloading in first call to 1555 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 1556 Class<?> ensureLoaded = LockSupport.class; 1557 } 1558 } 1559