1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.AbstractQueue; 39 import java.util.Arrays; 40 import java.util.Collection; 41 import java.util.Iterator; 42 import java.util.NoSuchElementException; 43 import java.util.Queue; 44 import java.util.Spliterator; 45 import java.util.Spliterators; 46 import java.util.concurrent.locks.LockSupport; 47 import java.util.function.Consumer; 48 49 // BEGIN android-note 50 // removed link to collections framework docs 51 // END android-note 52 53 /** 54 * An unbounded {@link TransferQueue} based on linked nodes. 55 * This queue orders elements FIFO (first-in-first-out) with respect 56 * to any given producer. The <em>head</em> of the queue is that 57 * element that has been on the queue the longest time for some 58 * producer. The <em>tail</em> of the queue is that element that has 59 * been on the queue the shortest time for some producer. 60 * 61 * <p>Beware that, unlike in most collections, the {@code size} method 62 * is <em>NOT</em> a constant-time operation. Because of the 63 * asynchronous nature of these queues, determining the current number 64 * of elements requires a traversal of the elements, and so may report 65 * inaccurate results if this collection is modified during traversal. 66 * Additionally, the bulk operations {@code addAll}, 67 * {@code removeAll}, {@code retainAll}, {@code containsAll}, 68 * {@code equals}, and {@code toArray} are <em>not</em> guaranteed 69 * to be performed atomically. For example, an iterator operating 70 * concurrently with an {@code addAll} operation might view only some 71 * of the added elements. 72 * 73 * <p>This class and its iterator implement all of the 74 * <em>optional</em> methods of the {@link Collection} and {@link 75 * Iterator} interfaces. 76 * 77 * <p>Memory consistency effects: As with other concurrent 78 * collections, actions in a thread prior to placing an object into a 79 * {@code LinkedTransferQueue} 80 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> 81 * actions subsequent to the access or removal of that element from 82 * the {@code LinkedTransferQueue} in another thread. 83 * 84 * @since 1.7 85 * @author Doug Lea 86 * @param <E> the type of elements held in this queue 87 */ 88 public class LinkedTransferQueue<E> extends AbstractQueue<E> 89 implements TransferQueue<E>, java.io.Serializable { 90 private static final long serialVersionUID = -3223113410248163686L; 91 92 /* 93 * *** Overview of Dual Queues with Slack *** 94 * 95 * Dual Queues, introduced by Scherer and Scott 96 * (http://www.cs.rice.edu/~wns1/papers/2004-DISC-DDS.pdf) are 97 * (linked) queues in which nodes may represent either data or 98 * requests. When a thread tries to enqueue a data node, but 99 * encounters a request node, it instead "matches" and removes it; 100 * and vice versa for enqueuing requests. Blocking Dual Queues 101 * arrange that threads enqueuing unmatched requests block until 102 * other threads provide the match. Dual Synchronous Queues (see 103 * Scherer, Lea, & Scott 104 * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf) 105 * additionally arrange that threads enqueuing unmatched data also 106 * block. Dual Transfer Queues support all of these modes, as 107 * dictated by callers. 108 * 109 * A FIFO dual queue may be implemented using a variation of the 110 * Michael & Scott (M&S) lock-free queue algorithm 111 * (http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf). 112 * It maintains two pointer fields, "head", pointing to a 113 * (matched) node that in turn points to the first actual 114 * (unmatched) queue node (or null if empty); and "tail" that 115 * points to the last node on the queue (or again null if 116 * empty). For example, here is a possible queue with four data 117 * elements: 118 * 119 * head tail 120 * | | 121 * v v 122 * M -> U -> U -> U -> U 123 * 124 * The M&S queue algorithm is known to be prone to scalability and 125 * overhead limitations when maintaining (via CAS) these head and 126 * tail pointers. This has led to the development of 127 * contention-reducing variants such as elimination arrays (see 128 * Moir et al http://portal.acm.org/citation.cfm?id=1074013) and 129 * optimistic back pointers (see Ladan-Mozes & Shavit 130 * http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf). 131 * However, the nature of dual queues enables a simpler tactic for 132 * improving M&S-style implementations when dual-ness is needed. 133 * 134 * In a dual queue, each node must atomically maintain its match 135 * status. While there are other possible variants, we implement 136 * this here as: for a data-mode node, matching entails CASing an 137 * "item" field from a non-null data value to null upon match, and 138 * vice-versa for request nodes, CASing from null to a data 139 * value. (Note that the linearization properties of this style of 140 * queue are easy to verify -- elements are made available by 141 * linking, and unavailable by matching.) Compared to plain M&S 142 * queues, this property of dual queues requires one additional 143 * successful atomic operation per enq/deq pair. But it also 144 * enables lower cost variants of queue maintenance mechanics. (A 145 * variation of this idea applies even for non-dual queues that 146 * support deletion of interior elements, such as 147 * j.u.c.ConcurrentLinkedQueue.) 148 * 149 * Once a node is matched, its match status can never again 150 * change. We may thus arrange that the linked list of them 151 * contain a prefix of zero or more matched nodes, followed by a 152 * suffix of zero or more unmatched nodes. (Note that we allow 153 * both the prefix and suffix to be zero length, which in turn 154 * means that we do not use a dummy header.) If we were not 155 * concerned with either time or space efficiency, we could 156 * correctly perform enqueue and dequeue operations by traversing 157 * from a pointer to the initial node; CASing the item of the 158 * first unmatched node on match and CASing the next field of the 159 * trailing node on appends. (Plus some special-casing when 160 * initially empty). While this would be a terrible idea in 161 * itself, it does have the benefit of not requiring ANY atomic 162 * updates on head/tail fields. 163 * 164 * We introduce here an approach that lies between the extremes of 165 * never versus always updating queue (head and tail) pointers. 166 * This offers a tradeoff between sometimes requiring extra 167 * traversal steps to locate the first and/or last unmatched 168 * nodes, versus the reduced overhead and contention of fewer 169 * updates to queue pointers. For example, a possible snapshot of 170 * a queue is: 171 * 172 * head tail 173 * | | 174 * v v 175 * M -> M -> U -> U -> U -> U 176 * 177 * The best value for this "slack" (the targeted maximum distance 178 * between the value of "head" and the first unmatched node, and 179 * similarly for "tail") is an empirical matter. We have found 180 * that using very small constants in the range of 1-3 work best 181 * over a range of platforms. Larger values introduce increasing 182 * costs of cache misses and risks of long traversal chains, while 183 * smaller values increase CAS contention and overhead. 184 * 185 * Dual queues with slack differ from plain M&S dual queues by 186 * virtue of only sometimes updating head or tail pointers when 187 * matching, appending, or even traversing nodes; in order to 188 * maintain a targeted slack. The idea of "sometimes" may be 189 * operationalized in several ways. The simplest is to use a 190 * per-operation counter incremented on each traversal step, and 191 * to try (via CAS) to update the associated queue pointer 192 * whenever the count exceeds a threshold. Another, that requires 193 * more overhead, is to use random number generators to update 194 * with a given probability per traversal step. 195 * 196 * In any strategy along these lines, because CASes updating 197 * fields may fail, the actual slack may exceed targeted 198 * slack. However, they may be retried at any time to maintain 199 * targets. Even when using very small slack values, this 200 * approach works well for dual queues because it allows all 201 * operations up to the point of matching or appending an item 202 * (hence potentially allowing progress by another thread) to be 203 * read-only, thus not introducing any further contention. As 204 * described below, we implement this by performing slack 205 * maintenance retries only after these points. 206 * 207 * As an accompaniment to such techniques, traversal overhead can 208 * be further reduced without increasing contention of head 209 * pointer updates: Threads may sometimes shortcut the "next" link 210 * path from the current "head" node to be closer to the currently 211 * known first unmatched node, and similarly for tail. Again, this 212 * may be triggered with using thresholds or randomization. 213 * 214 * These ideas must be further extended to avoid unbounded amounts 215 * of costly-to-reclaim garbage caused by the sequential "next" 216 * links of nodes starting at old forgotten head nodes: As first 217 * described in detail by Boehm 218 * (http://portal.acm.org/citation.cfm?doid=503272.503282), if a GC 219 * delays noticing that any arbitrarily old node has become 220 * garbage, all newer dead nodes will also be unreclaimed. 221 * (Similar issues arise in non-GC environments.) To cope with 222 * this in our implementation, upon CASing to advance the head 223 * pointer, we set the "next" link of the previous head to point 224 * only to itself; thus limiting the length of connected dead lists. 225 * (We also take similar care to wipe out possibly garbage 226 * retaining values held in other Node fields.) However, doing so 227 * adds some further complexity to traversal: If any "next" 228 * pointer links to itself, it indicates that the current thread 229 * has lagged behind a head-update, and so the traversal must 230 * continue from the "head". Traversals trying to find the 231 * current tail starting from "tail" may also encounter 232 * self-links, in which case they also continue at "head". 233 * 234 * It is tempting in slack-based scheme to not even use CAS for 235 * updates (similarly to Ladan-Mozes & Shavit). However, this 236 * cannot be done for head updates under the above link-forgetting 237 * mechanics because an update may leave head at a detached node. 238 * And while direct writes are possible for tail updates, they 239 * increase the risk of long retraversals, and hence long garbage 240 * chains, which can be much more costly than is worthwhile 241 * considering that the cost difference of performing a CAS vs 242 * write is smaller when they are not triggered on each operation 243 * (especially considering that writes and CASes equally require 244 * additional GC bookkeeping ("write barriers") that are sometimes 245 * more costly than the writes themselves because of contention). 246 * 247 * *** Overview of implementation *** 248 * 249 * We use a threshold-based approach to updates, with a slack 250 * threshold of two -- that is, we update head/tail when the 251 * current pointer appears to be two or more steps away from the 252 * first/last node. The slack value is hard-wired: a path greater 253 * than one is naturally implemented by checking equality of 254 * traversal pointers except when the list has only one element, 255 * in which case we keep slack threshold at one. Avoiding tracking 256 * explicit counts across method calls slightly simplifies an 257 * already-messy implementation. Using randomization would 258 * probably work better if there were a low-quality dirt-cheap 259 * per-thread one available, but even ThreadLocalRandom is too 260 * heavy for these purposes. 261 * 262 * With such a small slack threshold value, it is not worthwhile 263 * to augment this with path short-circuiting (i.e., unsplicing 264 * interior nodes) except in the case of cancellation/removal (see 265 * below). 266 * 267 * We allow both the head and tail fields to be null before any 268 * nodes are enqueued; initializing upon first append. This 269 * simplifies some other logic, as well as providing more 270 * efficient explicit control paths instead of letting JVMs insert 271 * implicit NullPointerExceptions when they are null. While not 272 * currently fully implemented, we also leave open the possibility 273 * of re-nulling these fields when empty (which is complicated to 274 * arrange, for little benefit.) 275 * 276 * All enqueue/dequeue operations are handled by the single method 277 * "xfer" with parameters indicating whether to act as some form 278 * of offer, put, poll, take, or transfer (each possibly with 279 * timeout). The relative complexity of using one monolithic 280 * method outweighs the code bulk and maintenance problems of 281 * using separate methods for each case. 282 * 283 * Operation consists of up to three phases. The first is 284 * implemented within method xfer, the second in tryAppend, and 285 * the third in method awaitMatch. 286 * 287 * 1. Try to match an existing node 288 * 289 * Starting at head, skip already-matched nodes until finding 290 * an unmatched node of opposite mode, if one exists, in which 291 * case matching it and returning, also if necessary updating 292 * head to one past the matched node (or the node itself if the 293 * list has no other unmatched nodes). If the CAS misses, then 294 * a loop retries advancing head by two steps until either 295 * success or the slack is at most two. By requiring that each 296 * attempt advances head by two (if applicable), we ensure that 297 * the slack does not grow without bound. Traversals also check 298 * if the initial head is now off-list, in which case they 299 * start at the new head. 300 * 301 * If no candidates are found and the call was untimed 302 * poll/offer, (argument "how" is NOW) return. 303 * 304 * 2. Try to append a new node (method tryAppend) 305 * 306 * Starting at current tail pointer, find the actual last node 307 * and try to append a new node (or if head was null, establish 308 * the first node). Nodes can be appended only if their 309 * predecessors are either already matched or are of the same 310 * mode. If we detect otherwise, then a new node with opposite 311 * mode must have been appended during traversal, so we must 312 * restart at phase 1. The traversal and update steps are 313 * otherwise similar to phase 1: Retrying upon CAS misses and 314 * checking for staleness. In particular, if a self-link is 315 * encountered, then we can safely jump to a node on the list 316 * by continuing the traversal at current head. 317 * 318 * On successful append, if the call was ASYNC, return. 319 * 320 * 3. Await match or cancellation (method awaitMatch) 321 * 322 * Wait for another thread to match node; instead cancelling if 323 * the current thread was interrupted or the wait timed out. On 324 * multiprocessors, we use front-of-queue spinning: If a node 325 * appears to be the first unmatched node in the queue, it 326 * spins a bit before blocking. In either case, before blocking 327 * it tries to unsplice any nodes between the current "head" 328 * and the first unmatched node. 329 * 330 * Front-of-queue spinning vastly improves performance of 331 * heavily contended queues. And so long as it is relatively 332 * brief and "quiet", spinning does not much impact performance 333 * of less-contended queues. During spins threads check their 334 * interrupt status and generate a thread-local random number 335 * to decide to occasionally perform a Thread.yield. While 336 * yield has underdefined specs, we assume that it might help, 337 * and will not hurt, in limiting impact of spinning on busy 338 * systems. We also use smaller (1/2) spins for nodes that are 339 * not known to be front but whose predecessors have not 340 * blocked -- these "chained" spins avoid artifacts of 341 * front-of-queue rules which otherwise lead to alternating 342 * nodes spinning vs blocking. Further, front threads that 343 * represent phase changes (from data to request node or vice 344 * versa) compared to their predecessors receive additional 345 * chained spins, reflecting longer paths typically required to 346 * unblock threads during phase changes. 347 * 348 * 349 * ** Unlinking removed interior nodes ** 350 * 351 * In addition to minimizing garbage retention via self-linking 352 * described above, we also unlink removed interior nodes. These 353 * may arise due to timed out or interrupted waits, or calls to 354 * remove(x) or Iterator.remove. Normally, given a node that was 355 * at one time known to be the predecessor of some node s that is 356 * to be removed, we can unsplice s by CASing the next field of 357 * its predecessor if it still points to s (otherwise s must 358 * already have been removed or is now offlist). But there are two 359 * situations in which we cannot guarantee to make node s 360 * unreachable in this way: (1) If s is the trailing node of list 361 * (i.e., with null next), then it is pinned as the target node 362 * for appends, so can only be removed later after other nodes are 363 * appended. (2) We cannot necessarily unlink s given a 364 * predecessor node that is matched (including the case of being 365 * cancelled): the predecessor may already be unspliced, in which 366 * case some previous reachable node may still point to s. 367 * (For further explanation see Herlihy & Shavit "The Art of 368 * Multiprocessor Programming" chapter 9). Although, in both 369 * cases, we can rule out the need for further action if either s 370 * or its predecessor are (or can be made to be) at, or fall off 371 * from, the head of list. 372 * 373 * Without taking these into account, it would be possible for an 374 * unbounded number of supposedly removed nodes to remain 375 * reachable. Situations leading to such buildup are uncommon but 376 * can occur in practice; for example when a series of short timed 377 * calls to poll repeatedly time out but never otherwise fall off 378 * the list because of an untimed call to take at the front of the 379 * queue. 380 * 381 * When these cases arise, rather than always retraversing the 382 * entire list to find an actual predecessor to unlink (which 383 * won't help for case (1) anyway), we record a conservative 384 * estimate of possible unsplice failures (in "sweepVotes"). 385 * We trigger a full sweep when the estimate exceeds a threshold 386 * ("SWEEP_THRESHOLD") indicating the maximum number of estimated 387 * removal failures to tolerate before sweeping through, unlinking 388 * cancelled nodes that were not unlinked upon initial removal. 389 * We perform sweeps by the thread hitting threshold (rather than 390 * background threads or by spreading work to other threads) 391 * because in the main contexts in which removal occurs, the 392 * caller is already timed-out, cancelled, or performing a 393 * potentially O(n) operation (e.g. remove(x)), none of which are 394 * time-critical enough to warrant the overhead that alternatives 395 * would impose on other threads. 396 * 397 * Because the sweepVotes estimate is conservative, and because 398 * nodes become unlinked "naturally" as they fall off the head of 399 * the queue, and because we allow votes to accumulate even while 400 * sweeps are in progress, there are typically significantly fewer 401 * such nodes than estimated. Choice of a threshold value 402 * balances the likelihood of wasted effort and contention, versus 403 * providing a worst-case bound on retention of interior nodes in 404 * quiescent queues. The value defined below was chosen 405 * empirically to balance these under various timeout scenarios. 406 * 407 * Note that we cannot self-link unlinked interior nodes during 408 * sweeps. However, the associated garbage chains terminate when 409 * some successor ultimately falls off the head of the list and is 410 * self-linked. 411 */ 412 413 /** True if on multiprocessor */ 414 private static final boolean MP = 415 Runtime.getRuntime().availableProcessors() > 1; 416 417 /** 418 * The number of times to spin (with randomly interspersed calls 419 * to Thread.yield) on multiprocessor before blocking when a node 420 * is apparently the first waiter in the queue. See above for 421 * explanation. Must be a power of two. The value is empirically 422 * derived -- it works pretty well across a variety of processors, 423 * numbers of CPUs, and OSes. 424 */ 425 private static final int FRONT_SPINS = 1 << 7; 426 427 /** 428 * The number of times to spin before blocking when a node is 429 * preceded by another node that is apparently spinning. Also 430 * serves as an increment to FRONT_SPINS on phase changes, and as 431 * base average frequency for yielding during spins. Must be a 432 * power of two. 433 */ 434 private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; 435 436 /** 437 * The maximum number of estimated removal failures (sweepVotes) 438 * to tolerate before sweeping through the queue unlinking 439 * cancelled nodes that were not unlinked upon initial 440 * removal. See above for explanation. The value must be at least 441 * two to avoid useless sweeps when removing trailing nodes. 442 */ 443 static final int SWEEP_THRESHOLD = 32; 444 445 /** 446 * Queue nodes. Uses Object, not E, for items to allow forgetting 447 * them after use. Relies heavily on Unsafe mechanics to minimize 448 * unnecessary ordering constraints: Writes that are intrinsically 449 * ordered wrt other accesses or CASes use simple relaxed forms. 450 */ 451 static final class Node { 452 final boolean isData; // false if this is a request node 453 volatile Object item; // initially non-null if isData; CASed to match 454 volatile Node next; 455 volatile Thread waiter; // null until waiting 456 457 // CAS methods for fields casNext(Node cmp, Node val)458 final boolean casNext(Node cmp, Node val) { 459 return U.compareAndSwapObject(this, NEXT, cmp, val); 460 } 461 casItem(Object cmp, Object val)462 final boolean casItem(Object cmp, Object val) { 463 // assert cmp == null || cmp.getClass() != Node.class; 464 return U.compareAndSwapObject(this, ITEM, cmp, val); 465 } 466 467 /** 468 * Constructs a new node. Uses relaxed write because item can 469 * only be seen after publication via casNext. 470 */ Node(Object item, boolean isData)471 Node(Object item, boolean isData) { 472 U.putObject(this, ITEM, item); // relaxed write 473 this.isData = isData; 474 } 475 476 /** 477 * Links node to itself to avoid garbage retention. Called 478 * only after CASing head field, so uses relaxed write. 479 */ forgetNext()480 final void forgetNext() { 481 U.putObject(this, NEXT, this); 482 } 483 484 /** 485 * Sets item to self and waiter to null, to avoid garbage 486 * retention after matching or cancelling. Uses relaxed writes 487 * because order is already constrained in the only calling 488 * contexts: item is forgotten only after volatile/atomic 489 * mechanics that extract items. Similarly, clearing waiter 490 * follows either CAS or return from park (if ever parked; 491 * else we don't care). 492 */ forgetContents()493 final void forgetContents() { 494 U.putObject(this, ITEM, this); 495 U.putObject(this, WAITER, null); 496 } 497 498 /** 499 * Returns true if this node has been matched, including the 500 * case of artificial matches due to cancellation. 501 */ isMatched()502 final boolean isMatched() { 503 Object x = item; 504 return (x == this) || ((x == null) == isData); 505 } 506 507 /** 508 * Returns true if this is an unmatched request node. 509 */ isUnmatchedRequest()510 final boolean isUnmatchedRequest() { 511 return !isData && item == null; 512 } 513 514 /** 515 * Returns true if a node with the given mode cannot be 516 * appended to this node because this node is unmatched and 517 * has opposite data mode. 518 */ cannotPrecede(boolean haveData)519 final boolean cannotPrecede(boolean haveData) { 520 boolean d = isData; 521 Object x; 522 return d != haveData && (x = item) != this && (x != null) == d; 523 } 524 525 /** 526 * Tries to artificially match a data node -- used by remove. 527 */ tryMatchData()528 final boolean tryMatchData() { 529 // assert isData; 530 Object x = item; 531 if (x != null && x != this && casItem(x, null)) { 532 LockSupport.unpark(waiter); 533 return true; 534 } 535 return false; 536 } 537 538 private static final long serialVersionUID = -3375979862319811754L; 539 540 // Unsafe mechanics 541 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); 542 private static final long ITEM; 543 private static final long NEXT; 544 private static final long WAITER; 545 static { 546 try { 547 ITEM = U.objectFieldOffset 548 (Node.class.getDeclaredField("item")); 549 NEXT = U.objectFieldOffset 550 (Node.class.getDeclaredField("next")); 551 WAITER = U.objectFieldOffset 552 (Node.class.getDeclaredField("waiter")); 553 } catch (ReflectiveOperationException e) { 554 throw new Error(e); 555 } 556 } 557 } 558 559 /** head of the queue; null until first enqueue */ 560 transient volatile Node head; 561 562 /** tail of the queue; null until first append */ 563 private transient volatile Node tail; 564 565 /** The number of apparent failures to unsplice removed nodes */ 566 private transient volatile int sweepVotes; 567 568 // CAS methods for fields casTail(Node cmp, Node val)569 private boolean casTail(Node cmp, Node val) { 570 return U.compareAndSwapObject(this, TAIL, cmp, val); 571 } 572 casHead(Node cmp, Node val)573 private boolean casHead(Node cmp, Node val) { 574 return U.compareAndSwapObject(this, HEAD, cmp, val); 575 } 576 casSweepVotes(int cmp, int val)577 private boolean casSweepVotes(int cmp, int val) { 578 return U.compareAndSwapInt(this, SWEEPVOTES, cmp, val); 579 } 580 581 /* 582 * Possible values for "how" argument in xfer method. 583 */ 584 private static final int NOW = 0; // for untimed poll, tryTransfer 585 private static final int ASYNC = 1; // for offer, put, add 586 private static final int SYNC = 2; // for transfer, take 587 private static final int TIMED = 3; // for timed poll, tryTransfer 588 589 /** 590 * Implements all queuing methods. See above for explanation. 591 * 592 * @param e the item or null for take 593 * @param haveData true if this is a put, else a take 594 * @param how NOW, ASYNC, SYNC, or TIMED 595 * @param nanos timeout in nanosecs, used only if mode is TIMED 596 * @return an item if matched, else e 597 * @throws NullPointerException if haveData mode but e is null 598 */ xfer(E e, boolean haveData, int how, long nanos)599 private E xfer(E e, boolean haveData, int how, long nanos) { 600 if (haveData && (e == null)) 601 throw new NullPointerException(); 602 Node s = null; // the node to append, if needed 603 604 retry: 605 for (;;) { // restart on append race 606 607 for (Node h = head, p = h; p != null;) { // find & match first node 608 boolean isData = p.isData; 609 Object item = p.item; 610 if (item != p && (item != null) == isData) { // unmatched 611 if (isData == haveData) // can't match 612 break; 613 if (p.casItem(item, e)) { // match 614 for (Node q = p; q != h;) { 615 Node n = q.next; // update by 2 unless singleton 616 if (head == h && casHead(h, n == null ? q : n)) { 617 h.forgetNext(); 618 break; 619 } // advance and retry 620 if ((h = head) == null || 621 (q = h.next) == null || !q.isMatched()) 622 break; // unless slack < 2 623 } 624 LockSupport.unpark(p.waiter); 625 @SuppressWarnings("unchecked") E itemE = (E) item; 626 return itemE; 627 } 628 } 629 Node n = p.next; 630 p = (p != n) ? n : (h = head); // Use head if p offlist 631 } 632 633 if (how != NOW) { // No matches available 634 if (s == null) 635 s = new Node(e, haveData); 636 Node pred = tryAppend(s, haveData); 637 if (pred == null) 638 continue retry; // lost race vs opposite mode 639 if (how != ASYNC) 640 return awaitMatch(s, pred, e, (how == TIMED), nanos); 641 } 642 return e; // not waiting 643 } 644 } 645 646 /** 647 * Tries to append node s as tail. 648 * 649 * @param s the node to append 650 * @param haveData true if appending in data mode 651 * @return null on failure due to losing race with append in 652 * different mode, else s's predecessor, or s itself if no 653 * predecessor 654 */ tryAppend(Node s, boolean haveData)655 private Node tryAppend(Node s, boolean haveData) { 656 for (Node t = tail, p = t;;) { // move p to last node and append 657 Node n, u; // temps for reads of next & tail 658 if (p == null && (p = head) == null) { 659 if (casHead(null, s)) 660 return s; // initialize 661 } 662 else if (p.cannotPrecede(haveData)) 663 return null; // lost race vs opposite mode 664 else if ((n = p.next) != null) // not last; keep traversing 665 p = p != t && t != (u = tail) ? (t = u) : // stale tail 666 (p != n) ? n : null; // restart if off list 667 else if (!p.casNext(null, s)) 668 p = p.next; // re-read on CAS failure 669 else { 670 if (p != t) { // update if slack now >= 2 671 while ((tail != t || !casTail(t, s)) && 672 (t = tail) != null && 673 (s = t.next) != null && // advance and retry 674 (s = s.next) != null && s != t); 675 } 676 return p; 677 } 678 } 679 } 680 681 /** 682 * Spins/yields/blocks until node s is matched or caller gives up. 683 * 684 * @param s the waiting node 685 * @param pred the predecessor of s, or s itself if it has no 686 * predecessor, or null if unknown (the null case does not occur 687 * in any current calls but may in possible future extensions) 688 * @param e the comparison value for checking match 689 * @param timed if true, wait only until timeout elapses 690 * @param nanos timeout in nanosecs, used only if timed is true 691 * @return matched item, or e if unmatched on interrupt or timeout 692 */ awaitMatch(Node s, Node pred, E e, boolean timed, long nanos)693 private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { 694 final long deadline = timed ? System.nanoTime() + nanos : 0L; 695 Thread w = Thread.currentThread(); 696 int spins = -1; // initialized after first item and cancel checks 697 ThreadLocalRandom randomYields = null; // bound if needed 698 699 for (;;) { 700 Object item = s.item; 701 if (item != e) { // matched 702 // assert item != s; 703 s.forgetContents(); // avoid garbage 704 @SuppressWarnings("unchecked") E itemE = (E) item; 705 return itemE; 706 } 707 else if (w.isInterrupted() || (timed && nanos <= 0L)) { 708 unsplice(pred, s); // try to unlink and cancel 709 if (s.casItem(e, s)) // return normally if lost CAS 710 return e; 711 } 712 else if (spins < 0) { // establish spins at/near front 713 if ((spins = spinsFor(pred, s.isData)) > 0) 714 randomYields = ThreadLocalRandom.current(); 715 } 716 else if (spins > 0) { // spin 717 --spins; 718 if (randomYields.nextInt(CHAINED_SPINS) == 0) 719 Thread.yield(); // occasionally yield 720 } 721 else if (s.waiter == null) { 722 s.waiter = w; // request unpark then recheck 723 } 724 else if (timed) { 725 nanos = deadline - System.nanoTime(); 726 if (nanos > 0L) 727 LockSupport.parkNanos(this, nanos); 728 } 729 else { 730 LockSupport.park(this); 731 } 732 } 733 } 734 735 /** 736 * Returns spin/yield value for a node with given predecessor and 737 * data mode. See above for explanation. 738 */ spinsFor(Node pred, boolean haveData)739 private static int spinsFor(Node pred, boolean haveData) { 740 if (MP && pred != null) { 741 if (pred.isData != haveData) // phase change 742 return FRONT_SPINS + CHAINED_SPINS; 743 if (pred.isMatched()) // probably at front 744 return FRONT_SPINS; 745 if (pred.waiter == null) // pred apparently spinning 746 return CHAINED_SPINS; 747 } 748 return 0; 749 } 750 751 /* -------------- Traversal methods -------------- */ 752 753 /** 754 * Returns the successor of p, or the head node if p.next has been 755 * linked to self, which will only be true if traversing with a 756 * stale pointer that is now off the list. 757 */ succ(Node p)758 final Node succ(Node p) { 759 Node next = p.next; 760 return (p == next) ? head : next; 761 } 762 763 /** 764 * Returns the first unmatched data node, or null if none. 765 * Callers must recheck if the returned node's item field is null 766 * or self-linked before using. 767 */ firstDataNode()768 final Node firstDataNode() { 769 restartFromHead: for (;;) { 770 for (Node p = head; p != null;) { 771 Object item = p.item; 772 if (p.isData) { 773 if (item != null && item != p) 774 return p; 775 } 776 else if (item == null) 777 break; 778 if (p == (p = p.next)) 779 continue restartFromHead; 780 } 781 return null; 782 } 783 } 784 785 /** 786 * Traverses and counts unmatched nodes of the given mode. 787 * Used by methods size and getWaitingConsumerCount. 788 */ countOfMode(boolean data)789 private int countOfMode(boolean data) { 790 restartFromHead: for (;;) { 791 int count = 0; 792 for (Node p = head; p != null;) { 793 if (!p.isMatched()) { 794 if (p.isData != data) 795 return 0; 796 if (++count == Integer.MAX_VALUE) 797 break; // @see Collection.size() 798 } 799 if (p == (p = p.next)) 800 continue restartFromHead; 801 } 802 return count; 803 } 804 } 805 toString()806 public String toString() { 807 String[] a = null; 808 restartFromHead: for (;;) { 809 int charLength = 0; 810 int size = 0; 811 for (Node p = head; p != null;) { 812 Object item = p.item; 813 if (p.isData) { 814 if (item != null && item != p) { 815 if (a == null) 816 a = new String[4]; 817 else if (size == a.length) 818 a = Arrays.copyOf(a, 2 * size); 819 String s = item.toString(); 820 a[size++] = s; 821 charLength += s.length(); 822 } 823 } else if (item == null) 824 break; 825 if (p == (p = p.next)) 826 continue restartFromHead; 827 } 828 829 if (size == 0) 830 return "[]"; 831 832 return Helpers.toString(a, size, charLength); 833 } 834 } 835 toArrayInternal(Object[] a)836 private Object[] toArrayInternal(Object[] a) { 837 Object[] x = a; 838 restartFromHead: for (;;) { 839 int size = 0; 840 for (Node p = head; p != null;) { 841 Object item = p.item; 842 if (p.isData) { 843 if (item != null && item != p) { 844 if (x == null) 845 x = new Object[4]; 846 else if (size == x.length) 847 x = Arrays.copyOf(x, 2 * (size + 4)); 848 x[size++] = item; 849 } 850 } else if (item == null) 851 break; 852 if (p == (p = p.next)) 853 continue restartFromHead; 854 } 855 if (x == null) 856 return new Object[0]; 857 else if (a != null && size <= a.length) { 858 if (a != x) 859 System.arraycopy(x, 0, a, 0, size); 860 if (size < a.length) 861 a[size] = null; 862 return a; 863 } 864 return (size == x.length) ? x : Arrays.copyOf(x, size); 865 } 866 } 867 868 /** 869 * Returns an array containing all of the elements in this queue, in 870 * proper sequence. 871 * 872 * <p>The returned array will be "safe" in that no references to it are 873 * maintained by this queue. (In other words, this method must allocate 874 * a new array). The caller is thus free to modify the returned array. 875 * 876 * <p>This method acts as bridge between array-based and collection-based 877 * APIs. 878 * 879 * @return an array containing all of the elements in this queue 880 */ toArray()881 public Object[] toArray() { 882 return toArrayInternal(null); 883 } 884 885 /** 886 * Returns an array containing all of the elements in this queue, in 887 * proper sequence; the runtime type of the returned array is that of 888 * the specified array. If the queue fits in the specified array, it 889 * is returned therein. Otherwise, a new array is allocated with the 890 * runtime type of the specified array and the size of this queue. 891 * 892 * <p>If this queue fits in the specified array with room to spare 893 * (i.e., the array has more elements than this queue), the element in 894 * the array immediately following the end of the queue is set to 895 * {@code null}. 896 * 897 * <p>Like the {@link #toArray()} method, this method acts as bridge between 898 * array-based and collection-based APIs. Further, this method allows 899 * precise control over the runtime type of the output array, and may, 900 * under certain circumstances, be used to save allocation costs. 901 * 902 * <p>Suppose {@code x} is a queue known to contain only strings. 903 * The following code can be used to dump the queue into a newly 904 * allocated array of {@code String}: 905 * 906 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 907 * 908 * Note that {@code toArray(new Object[0])} is identical in function to 909 * {@code toArray()}. 910 * 911 * @param a the array into which the elements of the queue are to 912 * be stored, if it is big enough; otherwise, a new array of the 913 * same runtime type is allocated for this purpose 914 * @return an array containing all of the elements in this queue 915 * @throws ArrayStoreException if the runtime type of the specified array 916 * is not a supertype of the runtime type of every element in 917 * this queue 918 * @throws NullPointerException if the specified array is null 919 */ 920 @SuppressWarnings("unchecked") toArray(T[] a)921 public <T> T[] toArray(T[] a) { 922 if (a == null) throw new NullPointerException(); 923 return (T[]) toArrayInternal(a); 924 } 925 926 final class Itr implements Iterator<E> { 927 private Node nextNode; // next node to return item for 928 private E nextItem; // the corresponding item 929 private Node lastRet; // last returned node, to support remove 930 private Node lastPred; // predecessor to unlink lastRet 931 932 /** 933 * Moves to next node after prev, or first node if prev null. 934 */ advance(Node prev)935 private void advance(Node prev) { 936 /* 937 * To track and avoid buildup of deleted nodes in the face 938 * of calls to both Queue.remove and Itr.remove, we must 939 * include variants of unsplice and sweep upon each 940 * advance: Upon Itr.remove, we may need to catch up links 941 * from lastPred, and upon other removes, we might need to 942 * skip ahead from stale nodes and unsplice deleted ones 943 * found while advancing. 944 */ 945 946 Node r, b; // reset lastPred upon possible deletion of lastRet 947 if ((r = lastRet) != null && !r.isMatched()) 948 lastPred = r; // next lastPred is old lastRet 949 else if ((b = lastPred) == null || b.isMatched()) 950 lastPred = null; // at start of list 951 else { 952 Node s, n; // help with removal of lastPred.next 953 while ((s = b.next) != null && 954 s != b && s.isMatched() && 955 (n = s.next) != null && n != s) 956 b.casNext(s, n); 957 } 958 959 this.lastRet = prev; 960 961 for (Node p = prev, s, n;;) { 962 s = (p == null) ? head : p.next; 963 if (s == null) 964 break; 965 else if (s == p) { 966 p = null; 967 continue; 968 } 969 Object item = s.item; 970 if (s.isData) { 971 if (item != null && item != s) { 972 @SuppressWarnings("unchecked") E itemE = (E) item; 973 nextItem = itemE; 974 nextNode = s; 975 return; 976 } 977 } 978 else if (item == null) 979 break; 980 // assert s.isMatched(); 981 if (p == null) 982 p = s; 983 else if ((n = s.next) == null) 984 break; 985 else if (s == n) 986 p = null; 987 else 988 p.casNext(s, n); 989 } 990 nextNode = null; 991 nextItem = null; 992 } 993 Itr()994 Itr() { 995 advance(null); 996 } 997 hasNext()998 public final boolean hasNext() { 999 return nextNode != null; 1000 } 1001 next()1002 public final E next() { 1003 Node p = nextNode; 1004 if (p == null) throw new NoSuchElementException(); 1005 E e = nextItem; 1006 advance(p); 1007 return e; 1008 } 1009 remove()1010 public final void remove() { 1011 final Node lastRet = this.lastRet; 1012 if (lastRet == null) 1013 throw new IllegalStateException(); 1014 this.lastRet = null; 1015 if (lastRet.tryMatchData()) 1016 unsplice(lastPred, lastRet); 1017 } 1018 } 1019 1020 /** A customized variant of Spliterators.IteratorSpliterator */ 1021 final class LTQSpliterator<E> implements Spliterator<E> { 1022 static final int MAX_BATCH = 1 << 25; // max batch array size; 1023 Node current; // current node; null until initialized 1024 int batch; // batch size for splits 1025 boolean exhausted; // true when no more nodes LTQSpliterator()1026 LTQSpliterator() {} 1027 trySplit()1028 public Spliterator<E> trySplit() { 1029 Node p; 1030 int b = batch; 1031 int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; 1032 if (!exhausted && 1033 ((p = current) != null || (p = firstDataNode()) != null) && 1034 p.next != null) { 1035 Object[] a = new Object[n]; 1036 int i = 0; 1037 do { 1038 Object e = p.item; 1039 if (e != p && (a[i] = e) != null) 1040 ++i; 1041 if (p == (p = p.next)) 1042 p = firstDataNode(); 1043 } while (p != null && i < n && p.isData); 1044 if ((current = p) == null) 1045 exhausted = true; 1046 if (i > 0) { 1047 batch = i; 1048 return Spliterators.spliterator 1049 (a, 0, i, (Spliterator.ORDERED | 1050 Spliterator.NONNULL | 1051 Spliterator.CONCURRENT)); 1052 } 1053 } 1054 return null; 1055 } 1056 1057 @SuppressWarnings("unchecked") forEachRemaining(Consumer<? super E> action)1058 public void forEachRemaining(Consumer<? super E> action) { 1059 Node p; 1060 if (action == null) throw new NullPointerException(); 1061 if (!exhausted && 1062 ((p = current) != null || (p = firstDataNode()) != null)) { 1063 exhausted = true; 1064 do { 1065 Object e = p.item; 1066 if (e != null && e != p) 1067 action.accept((E)e); 1068 if (p == (p = p.next)) 1069 p = firstDataNode(); 1070 } while (p != null && p.isData); 1071 } 1072 } 1073 1074 @SuppressWarnings("unchecked") tryAdvance(Consumer<? super E> action)1075 public boolean tryAdvance(Consumer<? super E> action) { 1076 Node p; 1077 if (action == null) throw new NullPointerException(); 1078 if (!exhausted && 1079 ((p = current) != null || (p = firstDataNode()) != null)) { 1080 Object e; 1081 do { 1082 if ((e = p.item) == p) 1083 e = null; 1084 if (p == (p = p.next)) 1085 p = firstDataNode(); 1086 } while (e == null && p != null && p.isData); 1087 if ((current = p) == null) 1088 exhausted = true; 1089 if (e != null) { 1090 action.accept((E)e); 1091 return true; 1092 } 1093 } 1094 return false; 1095 } 1096 estimateSize()1097 public long estimateSize() { return Long.MAX_VALUE; } 1098 characteristics()1099 public int characteristics() { 1100 return Spliterator.ORDERED | Spliterator.NONNULL | 1101 Spliterator.CONCURRENT; 1102 } 1103 } 1104 1105 /** 1106 * Returns a {@link Spliterator} over the elements in this queue. 1107 * 1108 * <p>The returned spliterator is 1109 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 1110 * 1111 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT}, 1112 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}. 1113 * 1114 * @implNote 1115 * The {@code Spliterator} implements {@code trySplit} to permit limited 1116 * parallelism. 1117 * 1118 * @return a {@code Spliterator} over the elements in this queue 1119 * @since 1.8 1120 */ spliterator()1121 public Spliterator<E> spliterator() { 1122 return new LTQSpliterator<E>(); 1123 } 1124 1125 /* -------------- Removal methods -------------- */ 1126 1127 /** 1128 * Unsplices (now or later) the given deleted/cancelled node with 1129 * the given predecessor. 1130 * 1131 * @param pred a node that was at one time known to be the 1132 * predecessor of s, or null or s itself if s is/was at head 1133 * @param s the node to be unspliced 1134 */ unsplice(Node pred, Node s)1135 final void unsplice(Node pred, Node s) { 1136 s.waiter = null; // disable signals 1137 /* 1138 * See above for rationale. Briefly: if pred still points to 1139 * s, try to unlink s. If s cannot be unlinked, because it is 1140 * trailing node or pred might be unlinked, and neither pred 1141 * nor s are head or offlist, add to sweepVotes, and if enough 1142 * votes have accumulated, sweep. 1143 */ 1144 if (pred != null && pred != s && pred.next == s) { 1145 Node n = s.next; 1146 if (n == null || 1147 (n != s && pred.casNext(s, n) && pred.isMatched())) { 1148 for (;;) { // check if at, or could be, head 1149 Node h = head; 1150 if (h == pred || h == s || h == null) 1151 return; // at head or list empty 1152 if (!h.isMatched()) 1153 break; 1154 Node hn = h.next; 1155 if (hn == null) 1156 return; // now empty 1157 if (hn != h && casHead(h, hn)) 1158 h.forgetNext(); // advance head 1159 } 1160 if (pred.next != pred && s.next != s) { // recheck if offlist 1161 for (;;) { // sweep now if enough votes 1162 int v = sweepVotes; 1163 if (v < SWEEP_THRESHOLD) { 1164 if (casSweepVotes(v, v + 1)) 1165 break; 1166 } 1167 else if (casSweepVotes(v, 0)) { 1168 sweep(); 1169 break; 1170 } 1171 } 1172 } 1173 } 1174 } 1175 } 1176 1177 /** 1178 * Unlinks matched (typically cancelled) nodes encountered in a 1179 * traversal from head. 1180 */ sweep()1181 private void sweep() { 1182 for (Node p = head, s, n; p != null && (s = p.next) != null; ) { 1183 if (!s.isMatched()) 1184 // Unmatched nodes are never self-linked 1185 p = s; 1186 else if ((n = s.next) == null) // trailing node is pinned 1187 break; 1188 else if (s == n) // stale 1189 // No need to also check for p == s, since that implies s == n 1190 p = head; 1191 else 1192 p.casNext(s, n); 1193 } 1194 } 1195 1196 /** 1197 * Main implementation of remove(Object) 1198 */ findAndRemove(Object e)1199 private boolean findAndRemove(Object e) { 1200 if (e != null) { 1201 for (Node pred = null, p = head; p != null; ) { 1202 Object item = p.item; 1203 if (p.isData) { 1204 if (item != null && item != p && e.equals(item) && 1205 p.tryMatchData()) { 1206 unsplice(pred, p); 1207 return true; 1208 } 1209 } 1210 else if (item == null) 1211 break; 1212 pred = p; 1213 if ((p = p.next) == pred) { // stale 1214 pred = null; 1215 p = head; 1216 } 1217 } 1218 } 1219 return false; 1220 } 1221 1222 /** 1223 * Creates an initially empty {@code LinkedTransferQueue}. 1224 */ LinkedTransferQueue()1225 public LinkedTransferQueue() { 1226 } 1227 1228 /** 1229 * Creates a {@code LinkedTransferQueue} 1230 * initially containing the elements of the given collection, 1231 * added in traversal order of the collection's iterator. 1232 * 1233 * @param c the collection of elements to initially contain 1234 * @throws NullPointerException if the specified collection or any 1235 * of its elements are null 1236 */ LinkedTransferQueue(Collection<? extends E> c)1237 public LinkedTransferQueue(Collection<? extends E> c) { 1238 this(); 1239 addAll(c); 1240 } 1241 1242 /** 1243 * Inserts the specified element at the tail of this queue. 1244 * As the queue is unbounded, this method will never block. 1245 * 1246 * @throws NullPointerException if the specified element is null 1247 */ put(E e)1248 public void put(E e) { 1249 xfer(e, true, ASYNC, 0); 1250 } 1251 1252 /** 1253 * Inserts the specified element at the tail of this queue. 1254 * As the queue is unbounded, this method will never block or 1255 * return {@code false}. 1256 * 1257 * @return {@code true} (as specified by 1258 * {@link java.util.concurrent.BlockingQueue#offer(Object,long,TimeUnit) 1259 * BlockingQueue.offer}) 1260 * @throws NullPointerException if the specified element is null 1261 */ offer(E e, long timeout, TimeUnit unit)1262 public boolean offer(E e, long timeout, TimeUnit unit) { 1263 xfer(e, true, ASYNC, 0); 1264 return true; 1265 } 1266 1267 /** 1268 * Inserts the specified element at the tail of this queue. 1269 * As the queue is unbounded, this method will never return {@code false}. 1270 * 1271 * @return {@code true} (as specified by {@link Queue#offer}) 1272 * @throws NullPointerException if the specified element is null 1273 */ offer(E e)1274 public boolean offer(E e) { 1275 xfer(e, true, ASYNC, 0); 1276 return true; 1277 } 1278 1279 /** 1280 * Inserts the specified element at the tail of this queue. 1281 * As the queue is unbounded, this method will never throw 1282 * {@link IllegalStateException} or return {@code false}. 1283 * 1284 * @return {@code true} (as specified by {@link Collection#add}) 1285 * @throws NullPointerException if the specified element is null 1286 */ add(E e)1287 public boolean add(E e) { 1288 xfer(e, true, ASYNC, 0); 1289 return true; 1290 } 1291 1292 /** 1293 * Transfers the element to a waiting consumer immediately, if possible. 1294 * 1295 * <p>More precisely, transfers the specified element immediately 1296 * if there exists a consumer already waiting to receive it (in 1297 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), 1298 * otherwise returning {@code false} without enqueuing the element. 1299 * 1300 * @throws NullPointerException if the specified element is null 1301 */ tryTransfer(E e)1302 public boolean tryTransfer(E e) { 1303 return xfer(e, true, NOW, 0) == null; 1304 } 1305 1306 /** 1307 * Transfers the element to a consumer, waiting if necessary to do so. 1308 * 1309 * <p>More precisely, transfers the specified element immediately 1310 * if there exists a consumer already waiting to receive it (in 1311 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), 1312 * else inserts the specified element at the tail of this queue 1313 * and waits until the element is received by a consumer. 1314 * 1315 * @throws NullPointerException if the specified element is null 1316 */ transfer(E e)1317 public void transfer(E e) throws InterruptedException { 1318 if (xfer(e, true, SYNC, 0) != null) { 1319 Thread.interrupted(); // failure possible only due to interrupt 1320 throw new InterruptedException(); 1321 } 1322 } 1323 1324 /** 1325 * Transfers the element to a consumer if it is possible to do so 1326 * before the timeout elapses. 1327 * 1328 * <p>More precisely, transfers the specified element immediately 1329 * if there exists a consumer already waiting to receive it (in 1330 * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), 1331 * else inserts the specified element at the tail of this queue 1332 * and waits until the element is received by a consumer, 1333 * returning {@code false} if the specified wait time elapses 1334 * before the element can be transferred. 1335 * 1336 * @throws NullPointerException if the specified element is null 1337 */ tryTransfer(E e, long timeout, TimeUnit unit)1338 public boolean tryTransfer(E e, long timeout, TimeUnit unit) 1339 throws InterruptedException { 1340 if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) 1341 return true; 1342 if (!Thread.interrupted()) 1343 return false; 1344 throw new InterruptedException(); 1345 } 1346 take()1347 public E take() throws InterruptedException { 1348 E e = xfer(null, false, SYNC, 0); 1349 if (e != null) 1350 return e; 1351 Thread.interrupted(); 1352 throw new InterruptedException(); 1353 } 1354 poll(long timeout, TimeUnit unit)1355 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 1356 E e = xfer(null, false, TIMED, unit.toNanos(timeout)); 1357 if (e != null || !Thread.interrupted()) 1358 return e; 1359 throw new InterruptedException(); 1360 } 1361 poll()1362 public E poll() { 1363 return xfer(null, false, NOW, 0); 1364 } 1365 1366 /** 1367 * @throws NullPointerException {@inheritDoc} 1368 * @throws IllegalArgumentException {@inheritDoc} 1369 */ drainTo(Collection<? super E> c)1370 public int drainTo(Collection<? super E> c) { 1371 if (c == null) 1372 throw new NullPointerException(); 1373 if (c == this) 1374 throw new IllegalArgumentException(); 1375 int n = 0; 1376 for (E e; (e = poll()) != null;) { 1377 c.add(e); 1378 ++n; 1379 } 1380 return n; 1381 } 1382 1383 /** 1384 * @throws NullPointerException {@inheritDoc} 1385 * @throws IllegalArgumentException {@inheritDoc} 1386 */ drainTo(Collection<? super E> c, int maxElements)1387 public int drainTo(Collection<? super E> c, int maxElements) { 1388 if (c == null) 1389 throw new NullPointerException(); 1390 if (c == this) 1391 throw new IllegalArgumentException(); 1392 int n = 0; 1393 for (E e; n < maxElements && (e = poll()) != null;) { 1394 c.add(e); 1395 ++n; 1396 } 1397 return n; 1398 } 1399 1400 /** 1401 * Returns an iterator over the elements in this queue in proper sequence. 1402 * The elements will be returned in order from first (head) to last (tail). 1403 * 1404 * <p>The returned iterator is 1405 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 1406 * 1407 * @return an iterator over the elements in this queue in proper sequence 1408 */ iterator()1409 public Iterator<E> iterator() { 1410 return new Itr(); 1411 } 1412 peek()1413 public E peek() { 1414 restartFromHead: for (;;) { 1415 for (Node p = head; p != null;) { 1416 Object item = p.item; 1417 if (p.isData) { 1418 if (item != null && item != p) { 1419 @SuppressWarnings("unchecked") E e = (E) item; 1420 return e; 1421 } 1422 } 1423 else if (item == null) 1424 break; 1425 if (p == (p = p.next)) 1426 continue restartFromHead; 1427 } 1428 return null; 1429 } 1430 } 1431 1432 /** 1433 * Returns {@code true} if this queue contains no elements. 1434 * 1435 * @return {@code true} if this queue contains no elements 1436 */ isEmpty()1437 public boolean isEmpty() { 1438 return firstDataNode() == null; 1439 } 1440 hasWaitingConsumer()1441 public boolean hasWaitingConsumer() { 1442 restartFromHead: for (;;) { 1443 for (Node p = head; p != null;) { 1444 Object item = p.item; 1445 if (p.isData) { 1446 if (item != null && item != p) 1447 break; 1448 } 1449 else if (item == null) 1450 return true; 1451 if (p == (p = p.next)) 1452 continue restartFromHead; 1453 } 1454 return false; 1455 } 1456 } 1457 1458 /** 1459 * Returns the number of elements in this queue. If this queue 1460 * contains more than {@code Integer.MAX_VALUE} elements, returns 1461 * {@code Integer.MAX_VALUE}. 1462 * 1463 * <p>Beware that, unlike in most collections, this method is 1464 * <em>NOT</em> a constant-time operation. Because of the 1465 * asynchronous nature of these queues, determining the current 1466 * number of elements requires an O(n) traversal. 1467 * 1468 * @return the number of elements in this queue 1469 */ size()1470 public int size() { 1471 return countOfMode(true); 1472 } 1473 getWaitingConsumerCount()1474 public int getWaitingConsumerCount() { 1475 return countOfMode(false); 1476 } 1477 1478 /** 1479 * Removes a single instance of the specified element from this queue, 1480 * if it is present. More formally, removes an element {@code e} such 1481 * that {@code o.equals(e)}, if this queue contains one or more such 1482 * elements. 1483 * Returns {@code true} if this queue contained the specified element 1484 * (or equivalently, if this queue changed as a result of the call). 1485 * 1486 * @param o element to be removed from this queue, if present 1487 * @return {@code true} if this queue changed as a result of the call 1488 */ remove(Object o)1489 public boolean remove(Object o) { 1490 return findAndRemove(o); 1491 } 1492 1493 /** 1494 * Returns {@code true} if this queue contains the specified element. 1495 * More formally, returns {@code true} if and only if this queue contains 1496 * at least one element {@code e} such that {@code o.equals(e)}. 1497 * 1498 * @param o object to be checked for containment in this queue 1499 * @return {@code true} if this queue contains the specified element 1500 */ contains(Object o)1501 public boolean contains(Object o) { 1502 if (o != null) { 1503 for (Node p = head; p != null; p = succ(p)) { 1504 Object item = p.item; 1505 if (p.isData) { 1506 if (item != null && item != p && o.equals(item)) 1507 return true; 1508 } 1509 else if (item == null) 1510 break; 1511 } 1512 } 1513 return false; 1514 } 1515 1516 /** 1517 * Always returns {@code Integer.MAX_VALUE} because a 1518 * {@code LinkedTransferQueue} is not capacity constrained. 1519 * 1520 * @return {@code Integer.MAX_VALUE} (as specified by 1521 * {@link java.util.concurrent.BlockingQueue#remainingCapacity() 1522 * BlockingQueue.remainingCapacity}) 1523 */ remainingCapacity()1524 public int remainingCapacity() { 1525 return Integer.MAX_VALUE; 1526 } 1527 1528 /** 1529 * Saves this queue to a stream (that is, serializes it). 1530 * 1531 * @param s the stream 1532 * @throws java.io.IOException if an I/O error occurs 1533 * @serialData All of the elements (each an {@code E}) in 1534 * the proper order, followed by a null 1535 */ writeObject(java.io.ObjectOutputStream s)1536 private void writeObject(java.io.ObjectOutputStream s) 1537 throws java.io.IOException { 1538 s.defaultWriteObject(); 1539 for (E e : this) 1540 s.writeObject(e); 1541 // Use trailing null as sentinel 1542 s.writeObject(null); 1543 } 1544 1545 /** 1546 * Reconstitutes this queue from a stream (that is, deserializes it). 1547 * @param s the stream 1548 * @throws ClassNotFoundException if the class of a serialized object 1549 * could not be found 1550 * @throws java.io.IOException if an I/O error occurs 1551 */ readObject(java.io.ObjectInputStream s)1552 private void readObject(java.io.ObjectInputStream s) 1553 throws java.io.IOException, ClassNotFoundException { 1554 s.defaultReadObject(); 1555 for (;;) { 1556 @SuppressWarnings("unchecked") 1557 E item = (E) s.readObject(); 1558 if (item == null) 1559 break; 1560 else 1561 offer(item); 1562 } 1563 } 1564 1565 // Unsafe mechanics 1566 1567 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); 1568 private static final long HEAD; 1569 private static final long TAIL; 1570 private static final long SWEEPVOTES; 1571 static { 1572 try { 1573 HEAD = U.objectFieldOffset 1574 (LinkedTransferQueue.class.getDeclaredField("head")); 1575 TAIL = U.objectFieldOffset 1576 (LinkedTransferQueue.class.getDeclaredField("tail")); 1577 SWEEPVOTES = U.objectFieldOffset 1578 (LinkedTransferQueue.class.getDeclaredField("sweepVotes")); 1579 } catch (ReflectiveOperationException e) { 1580 throw new Error(e); 1581 } 1582 1583 // Reduce the risk of rare disastrous classloading in first call to 1584 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 1585 Class<?> ensureLoaded = LockSupport.class; 1586 } 1587 } 1588