1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.lang.invoke.MethodHandles; 39 import java.lang.invoke.VarHandle; 40 import java.util.ArrayList; 41 import java.util.Arrays; 42 import java.util.List; 43 import java.util.concurrent.locks.LockSupport; 44 import java.util.concurrent.locks.ReentrantLock; 45 import java.util.function.BiConsumer; 46 import java.util.function.BiPredicate; 47 import java.util.function.Consumer; 48 import static java.util.concurrent.Flow.Publisher; 49 import static java.util.concurrent.Flow.Subscriber; 50 import static java.util.concurrent.Flow.Subscription; 51 52 /** 53 * A {@link Flow.Publisher} that asynchronously issues submitted 54 * (non-null) items to current subscribers until it is closed. Each 55 * current subscriber receives newly submitted items in the same order 56 * unless drops or exceptions are encountered. Using a 57 * SubmissionPublisher allows item generators to act as compliant <a 58 * href="http://www.reactive-streams.org/"> reactive-streams</a> 59 * Publishers relying on drop handling and/or blocking for flow 60 * control. 61 * 62 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its 63 * constructor for delivery to subscribers. The best choice of 64 * Executor depends on expected usage. If the generator(s) of 65 * submitted items run in separate threads, and the number of 66 * subscribers can be estimated, consider using a {@link 67 * Executors#newFixedThreadPool}. Otherwise consider using the 68 * default, normally the {@link ForkJoinPool#commonPool}. 69 * 70 * <p>Buffering allows producers and consumers to transiently operate 71 * at different rates. Each subscriber uses an independent buffer. 72 * Buffers are created upon first use and expanded as needed up to the 73 * given maximum. (The enforced capacity may be rounded up to the 74 * nearest power of two and/or bounded by the largest value supported 75 * by this implementation.) Invocations of {@link 76 * Flow.Subscription#request(long) request} do not directly result in 77 * buffer expansion, but risk saturation if unfilled requests exceed 78 * the maximum capacity. The default value of {@link 79 * Flow#defaultBufferSize()} may provide a useful starting point for 80 * choosing a capacity based on expected rates, resources, and usages. 81 * 82 * <p>A single SubmissionPublisher may be shared among multiple 83 * sources. Actions in a source thread prior to publishing an item or 84 * issuing a signal <a href="package-summary.html#MemoryVisibility"> 85 * <i>happen-before</i></a> actions subsequent to the corresponding 86 * access by each subscriber. But reported estimates of lag and demand 87 * are designed for use in monitoring, not for synchronization 88 * control, and may reflect stale or inaccurate views of progress. 89 * 90 * <p>Publication methods support different policies about what to do 91 * when buffers are saturated. Method {@link #submit(Object) submit} 92 * blocks until resources are available. This is simplest, but least 93 * responsive. The {@code offer} methods may drop items (either 94 * immediately or with bounded timeout), but provide an opportunity to 95 * interpose a handler and then retry. 96 * 97 * <p>If any Subscriber method throws an exception, its subscription 98 * is cancelled. If a handler is supplied as a constructor argument, 99 * it is invoked before cancellation upon an exception in method 100 * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods 101 * {@link Flow.Subscriber#onSubscribe onSubscribe}, 102 * {@link Flow.Subscriber#onError(Throwable) onError} and 103 * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or 104 * handled before cancellation. If the supplied Executor throws 105 * {@link RejectedExecutionException} (or any other RuntimeException 106 * or Error) when attempting to execute a task, or a drop handler 107 * throws an exception when processing a dropped item, then the 108 * exception is rethrown. In these cases, not all subscribers will 109 * have been issued the published item. It is usually good practice to 110 * {@link #closeExceptionally closeExceptionally} in these cases. 111 * 112 * <p>Method {@link #consume(Consumer)} simplifies support for a 113 * common case in which the only action of a subscriber is to request 114 * and process all items using a supplied function. 115 * 116 * <p>This class may also serve as a convenient base for subclasses 117 * that generate items, and use the methods in this class to publish 118 * them. For example here is a class that periodically publishes the 119 * items generated from a supplier. (In practice you might add methods 120 * to independently start and stop generation, to share Executors 121 * among publishers, and so on, or use a SubmissionPublisher as a 122 * component rather than a superclass.) 123 * 124 * <pre> {@code 125 * class PeriodicPublisher<T> extends SubmissionPublisher<T> { 126 * final ScheduledFuture<?> periodicTask; 127 * final ScheduledExecutorService scheduler; 128 * PeriodicPublisher(Executor executor, int maxBufferCapacity, 129 * Supplier<? extends T> supplier, 130 * long period, TimeUnit unit) { 131 * super(executor, maxBufferCapacity); 132 * scheduler = new ScheduledThreadPoolExecutor(1); 133 * periodicTask = scheduler.scheduleAtFixedRate( 134 * () -> submit(supplier.get()), 0, period, unit); 135 * } 136 * public void close() { 137 * periodicTask.cancel(false); 138 * scheduler.shutdown(); 139 * super.close(); 140 * } 141 * }}</pre> 142 * 143 * <p>Here is an example of a {@link Flow.Processor} implementation. 144 * It uses single-step requests to its publisher for simplicity of 145 * illustration. A more adaptive version could monitor flow using the 146 * lag estimate returned from {@code submit}, along with other utility 147 * methods. 148 * 149 * <pre> {@code 150 * class TransformProcessor<S,T> extends SubmissionPublisher<T> 151 * implements Flow.Processor<S,T> { 152 * final Function<? super S, ? extends T> function; 153 * Flow.Subscription subscription; 154 * TransformProcessor(Executor executor, int maxBufferCapacity, 155 * Function<? super S, ? extends T> function) { 156 * super(executor, maxBufferCapacity); 157 * this.function = function; 158 * } 159 * public void onSubscribe(Flow.Subscription subscription) { 160 * (this.subscription = subscription).request(1); 161 * } 162 * public void onNext(S item) { 163 * subscription.request(1); 164 * submit(function.apply(item)); 165 * } 166 * public void onError(Throwable ex) { closeExceptionally(ex); } 167 * public void onComplete() { close(); } 168 * }}</pre> 169 * 170 * @param <T> the published item type 171 * @author Doug Lea 172 * @since 9 173 */ 174 public class SubmissionPublisher<T> implements Publisher<T>, 175 AutoCloseable { 176 /* 177 * Most mechanics are handled by BufferedSubscription. This class 178 * mainly tracks subscribers and ensures sequentiality, by using 179 * locks across public methods, to ensure thread-safety in the 180 * presence of multiple sources and maintain acquire-release 181 * ordering around user operations. However, we also track whether 182 * there is only a single source, and if so streamline some buffer 183 * operations by avoiding some atomics. 184 */ 185 186 /** The largest possible power of two array size. */ 187 static final int BUFFER_CAPACITY_LIMIT = 1 << 30; 188 189 /** 190 * Initial buffer capacity used when maxBufferCapacity is 191 * greater. Must be a power of two. 192 */ 193 static final int INITIAL_CAPACITY = 32; 194 195 /** Round capacity to power of 2, at most limit. */ roundCapacity(int cap)196 static final int roundCapacity(int cap) { 197 int n = cap - 1; 198 n |= n >>> 1; 199 n |= n >>> 2; 200 n |= n >>> 4; 201 n |= n >>> 8; 202 n |= n >>> 16; 203 return (n <= 0) ? 1 : // at least 1 204 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1; 205 } 206 207 // default Executor setup; nearly the same as CompletableFuture 208 209 /** 210 * Default executor -- ForkJoinPool.commonPool() unless it cannot 211 * support parallelism. 212 */ 213 private static final Executor ASYNC_POOL = 214 (ForkJoinPool.getCommonPoolParallelism() > 1) ? 215 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); 216 217 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ 218 private static final class ThreadPerTaskExecutor implements Executor { ThreadPerTaskExecutor()219 ThreadPerTaskExecutor() {} // prevent access constructor creation execute(Runnable r)220 public void execute(Runnable r) { new Thread(r).start(); } 221 } 222 223 /** 224 * Clients (BufferedSubscriptions) are maintained in a linked list 225 * (via their "next" fields). This works well for publish loops. 226 * It requires O(n) traversal to check for duplicate subscribers, 227 * but we expect that subscribing is much less common than 228 * publishing. Unsubscribing occurs only during traversal loops, 229 * when BufferedSubscription methods return negative values 230 * signifying that they have been closed. To reduce 231 * head-of-line blocking, submit and offer methods first call 232 * BufferedSubscription.offer on each subscriber, and place 233 * saturated ones in retries list (using nextRetry field), and 234 * retry, possibly blocking or dropping. 235 */ 236 BufferedSubscription<T> clients; 237 238 /** Lock for exclusion across multiple sources */ 239 final ReentrantLock lock; 240 /** Run status, updated only within locks */ 241 volatile boolean closed; 242 /** Set true on first call to subscribe, to initialize possible owner */ 243 boolean subscribed; 244 /** The first caller thread to subscribe, or null if thread ever changed */ 245 Thread owner; 246 /** If non-null, the exception in closeExceptionally */ 247 volatile Throwable closedException; 248 249 // Parameters for constructing BufferedSubscriptions 250 final Executor executor; 251 final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler; 252 final int maxBufferCapacity; 253 254 /** 255 * Creates a new SubmissionPublisher using the given Executor for 256 * async delivery to subscribers, with the given maximum buffer size 257 * for each subscriber, and, if non-null, the given handler invoked 258 * when any Subscriber throws an exception in method {@link 259 * Flow.Subscriber#onNext(Object) onNext}. 260 * 261 * @param executor the executor to use for async delivery, 262 * supporting creation of at least one independent thread 263 * @param maxBufferCapacity the maximum capacity for each 264 * subscriber's buffer (the enforced capacity may be rounded up to 265 * the nearest power of two and/or bounded by the largest value 266 * supported by this implementation; method {@link #getMaxBufferCapacity} 267 * returns the actual value) 268 * @param handler if non-null, procedure to invoke upon exception 269 * thrown in method {@code onNext} 270 * @throws NullPointerException if executor is null 271 * @throws IllegalArgumentException if maxBufferCapacity not 272 * positive 273 */ SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler)274 public SubmissionPublisher(Executor executor, int maxBufferCapacity, 275 BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler) { 276 if (executor == null) 277 throw new NullPointerException(); 278 if (maxBufferCapacity <= 0) 279 throw new IllegalArgumentException("capacity must be positive"); 280 this.lock = new ReentrantLock(); 281 this.executor = executor; 282 this.onNextHandler = handler; 283 this.maxBufferCapacity = roundCapacity(maxBufferCapacity); 284 } 285 286 /** 287 * Creates a new SubmissionPublisher using the given Executor for 288 * async delivery to subscribers, with the given maximum buffer size 289 * for each subscriber, and no handler for Subscriber exceptions in 290 * method {@link Flow.Subscriber#onNext(Object) onNext}. 291 * 292 * @param executor the executor to use for async delivery, 293 * supporting creation of at least one independent thread 294 * @param maxBufferCapacity the maximum capacity for each 295 * subscriber's buffer (the enforced capacity may be rounded up to 296 * the nearest power of two and/or bounded by the largest value 297 * supported by this implementation; method {@link #getMaxBufferCapacity} 298 * returns the actual value) 299 * @throws NullPointerException if executor is null 300 * @throws IllegalArgumentException if maxBufferCapacity not 301 * positive 302 */ SubmissionPublisher(Executor executor, int maxBufferCapacity)303 public SubmissionPublisher(Executor executor, int maxBufferCapacity) { 304 this(executor, maxBufferCapacity, null); 305 } 306 307 /** 308 * Creates a new SubmissionPublisher using the {@link 309 * ForkJoinPool#commonPool()} for async delivery to subscribers 310 * (unless it does not support a parallelism level of at least two, 311 * in which case, a new Thread is created to run each task), with 312 * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no 313 * handler for Subscriber exceptions in method {@link 314 * Flow.Subscriber#onNext(Object) onNext}. 315 */ SubmissionPublisher()316 public SubmissionPublisher() { 317 this(ASYNC_POOL, Flow.defaultBufferSize(), null); 318 } 319 320 /** 321 * Adds the given Subscriber unless already subscribed. If already 322 * subscribed, the Subscriber's {@link 323 * Flow.Subscriber#onError(Throwable) onError} method is invoked on 324 * the existing subscription with an {@link IllegalStateException}. 325 * Otherwise, upon success, the Subscriber's {@link 326 * Flow.Subscriber#onSubscribe onSubscribe} method is invoked 327 * asynchronously with a new {@link Flow.Subscription}. If {@link 328 * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the 329 * subscription is cancelled. Otherwise, if this SubmissionPublisher 330 * was closed exceptionally, then the subscriber's {@link 331 * Flow.Subscriber#onError onError} method is invoked with the 332 * corresponding exception, or if closed without exception, the 333 * subscriber's {@link Flow.Subscriber#onComplete() onComplete} 334 * method is invoked. Subscribers may enable receiving items by 335 * invoking the {@link Flow.Subscription#request(long) request} 336 * method of the new Subscription, and may unsubscribe by invoking 337 * its {@link Flow.Subscription#cancel() cancel} method. 338 * 339 * @param subscriber the subscriber 340 * @throws NullPointerException if subscriber is null 341 */ subscribe(Subscriber<? super T> subscriber)342 public void subscribe(Subscriber<? super T> subscriber) { 343 if (subscriber == null) throw new NullPointerException(); 344 ReentrantLock lock = this.lock; 345 int max = maxBufferCapacity; // allocate initial array 346 Object[] array = new Object[max < INITIAL_CAPACITY ? 347 max : INITIAL_CAPACITY]; 348 BufferedSubscription<T> subscription = 349 new BufferedSubscription<T>(subscriber, executor, onNextHandler, 350 array, max); 351 lock.lock(); 352 try { 353 if (!subscribed) { 354 subscribed = true; 355 owner = Thread.currentThread(); 356 } 357 for (BufferedSubscription<T> b = clients, pred = null;;) { 358 if (b == null) { 359 Throwable ex; 360 subscription.onSubscribe(); 361 if ((ex = closedException) != null) 362 subscription.onError(ex); 363 else if (closed) 364 subscription.onComplete(); 365 else if (pred == null) 366 clients = subscription; 367 else 368 pred.next = subscription; 369 break; 370 } 371 BufferedSubscription<T> next = b.next; 372 if (b.isClosed()) { // remove 373 b.next = null; // detach 374 if (pred == null) 375 clients = next; 376 else 377 pred.next = next; 378 } 379 else if (subscriber.equals(b.subscriber)) { 380 b.onError(new IllegalStateException("Duplicate subscribe")); 381 break; 382 } 383 else 384 pred = b; 385 b = next; 386 } 387 } finally { 388 lock.unlock(); 389 } 390 } 391 392 /** 393 * Common implementation for all three forms of submit and offer. 394 * Acts as submit if nanos == Long.MAX_VALUE, else offer. 395 */ doOffer(T item, long nanos, BiPredicate<Subscriber<? super T>, ? super T> onDrop)396 private int doOffer(T item, long nanos, 397 BiPredicate<Subscriber<? super T>, ? super T> onDrop) { 398 if (item == null) throw new NullPointerException(); 399 int lag = 0; 400 boolean complete, unowned; 401 ReentrantLock lock = this.lock; 402 lock.lock(); 403 try { 404 Thread t = Thread.currentThread(), o; 405 BufferedSubscription<T> b = clients; 406 if ((unowned = ((o = owner) != t)) && o != null) 407 owner = null; // disable bias 408 if (b == null) 409 complete = closed; 410 else { 411 complete = false; 412 boolean cleanMe = false; 413 BufferedSubscription<T> retries = null, rtail = null, next; 414 do { 415 next = b.next; 416 int stat = b.offer(item, unowned); 417 if (stat == 0) { // saturated; add to retry list 418 b.nextRetry = null; // avoid garbage on exceptions 419 if (rtail == null) 420 retries = b; 421 else 422 rtail.nextRetry = b; 423 rtail = b; 424 } 425 else if (stat < 0) // closed 426 cleanMe = true; // remove later 427 else if (stat > lag) 428 lag = stat; 429 } while ((b = next) != null); 430 431 if (retries != null || cleanMe) 432 lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe); 433 } 434 } finally { 435 lock.unlock(); 436 } 437 if (complete) 438 throw new IllegalStateException("Closed"); 439 else 440 return lag; 441 } 442 443 /** 444 * Helps, (timed) waits for, and/or drops buffers on list; returns 445 * lag or negative drops (for use in offer). 446 */ retryOffer(T item, long nanos, BiPredicate<Subscriber<? super T>, ? super T> onDrop, BufferedSubscription<T> retries, int lag, boolean cleanMe)447 private int retryOffer(T item, long nanos, 448 BiPredicate<Subscriber<? super T>, ? super T> onDrop, 449 BufferedSubscription<T> retries, int lag, 450 boolean cleanMe) { 451 for (BufferedSubscription<T> r = retries; r != null;) { 452 BufferedSubscription<T> nextRetry = r.nextRetry; 453 r.nextRetry = null; 454 if (nanos > 0L) 455 r.awaitSpace(nanos); 456 int stat = r.retryOffer(item); 457 if (stat == 0 && onDrop != null && onDrop.test(r.subscriber, item)) 458 stat = r.retryOffer(item); 459 if (stat == 0) 460 lag = (lag >= 0) ? -1 : lag - 1; 461 else if (stat < 0) 462 cleanMe = true; 463 else if (lag >= 0 && stat > lag) 464 lag = stat; 465 r = nextRetry; 466 } 467 if (cleanMe) 468 cleanAndCount(); 469 return lag; 470 } 471 472 /** 473 * Returns current list count after removing closed subscribers. 474 * Call only while holding lock. Used mainly by retryOffer for 475 * cleanup. 476 */ cleanAndCount()477 private int cleanAndCount() { 478 int count = 0; 479 BufferedSubscription<T> pred = null, next; 480 for (BufferedSubscription<T> b = clients; b != null; b = next) { 481 next = b.next; 482 if (b.isClosed()) { 483 b.next = null; 484 if (pred == null) 485 clients = next; 486 else 487 pred.next = next; 488 } 489 else { 490 pred = b; 491 ++count; 492 } 493 } 494 return count; 495 } 496 497 /** 498 * Publishes the given item to each current subscriber by 499 * asynchronously invoking its {@link Flow.Subscriber#onNext(Object) 500 * onNext} method, blocking uninterruptibly while resources for any 501 * subscriber are unavailable. This method returns an estimate of 502 * the maximum lag (number of items submitted but not yet consumed) 503 * among all current subscribers. This value is at least one 504 * (accounting for this submitted item) if there are any 505 * subscribers, else zero. 506 * 507 * <p>If the Executor for this publisher throws a 508 * RejectedExecutionException (or any other RuntimeException or 509 * Error) when attempting to asynchronously notify subscribers, 510 * then this exception is rethrown, in which case not all 511 * subscribers will have been issued this item. 512 * 513 * @param item the (non-null) item to publish 514 * @return the estimated maximum lag among subscribers 515 * @throws IllegalStateException if closed 516 * @throws NullPointerException if item is null 517 * @throws RejectedExecutionException if thrown by Executor 518 */ submit(T item)519 public int submit(T item) { 520 return doOffer(item, Long.MAX_VALUE, null); 521 } 522 523 /** 524 * Publishes the given item, if possible, to each current subscriber 525 * by asynchronously invoking its {@link 526 * Flow.Subscriber#onNext(Object) onNext} method. The item may be 527 * dropped by one or more subscribers if resource limits are 528 * exceeded, in which case the given handler (if non-null) is 529 * invoked, and if it returns true, retried once. Other calls to 530 * methods in this class by other threads are blocked while the 531 * handler is invoked. Unless recovery is assured, options are 532 * usually limited to logging the error and/or issuing an {@link 533 * Flow.Subscriber#onError(Throwable) onError} signal to the 534 * subscriber. 535 * 536 * <p>This method returns a status indicator: If negative, it 537 * represents the (negative) number of drops (failed attempts to 538 * issue the item to a subscriber). Otherwise it is an estimate of 539 * the maximum lag (number of items submitted but not yet 540 * consumed) among all current subscribers. This value is at least 541 * one (accounting for this submitted item) if there are any 542 * subscribers, else zero. 543 * 544 * <p>If the Executor for this publisher throws a 545 * RejectedExecutionException (or any other RuntimeException or 546 * Error) when attempting to asynchronously notify subscribers, or 547 * the drop handler throws an exception when processing a dropped 548 * item, then this exception is rethrown. 549 * 550 * @param item the (non-null) item to publish 551 * @param onDrop if non-null, the handler invoked upon a drop to a 552 * subscriber, with arguments of the subscriber and item; if it 553 * returns true, an offer is re-attempted (once) 554 * @return if negative, the (negative) number of drops; otherwise 555 * an estimate of maximum lag 556 * @throws IllegalStateException if closed 557 * @throws NullPointerException if item is null 558 * @throws RejectedExecutionException if thrown by Executor 559 */ offer(T item, BiPredicate<Subscriber<? super T>, ? super T> onDrop)560 public int offer(T item, 561 BiPredicate<Subscriber<? super T>, ? super T> onDrop) { 562 return doOffer(item, 0L, onDrop); 563 } 564 565 /** 566 * Publishes the given item, if possible, to each current subscriber 567 * by asynchronously invoking its {@link 568 * Flow.Subscriber#onNext(Object) onNext} method, blocking while 569 * resources for any subscription are unavailable, up to the 570 * specified timeout or until the caller thread is interrupted, at 571 * which point the given handler (if non-null) is invoked, and if it 572 * returns true, retried once. (The drop handler may distinguish 573 * timeouts from interrupts by checking whether the current thread 574 * is interrupted.) Other calls to methods in this class by other 575 * threads are blocked while the handler is invoked. Unless 576 * recovery is assured, options are usually limited to logging the 577 * error and/or issuing an {@link Flow.Subscriber#onError(Throwable) 578 * onError} signal to the subscriber. 579 * 580 * <p>This method returns a status indicator: If negative, it 581 * represents the (negative) number of drops (failed attempts to 582 * issue the item to a subscriber). Otherwise it is an estimate of 583 * the maximum lag (number of items submitted but not yet 584 * consumed) among all current subscribers. This value is at least 585 * one (accounting for this submitted item) if there are any 586 * subscribers, else zero. 587 * 588 * <p>If the Executor for this publisher throws a 589 * RejectedExecutionException (or any other RuntimeException or 590 * Error) when attempting to asynchronously notify subscribers, or 591 * the drop handler throws an exception when processing a dropped 592 * item, then this exception is rethrown. 593 * 594 * @param item the (non-null) item to publish 595 * @param timeout how long to wait for resources for any subscriber 596 * before giving up, in units of {@code unit} 597 * @param unit a {@code TimeUnit} determining how to interpret the 598 * {@code timeout} parameter 599 * @param onDrop if non-null, the handler invoked upon a drop to a 600 * subscriber, with arguments of the subscriber and item; if it 601 * returns true, an offer is re-attempted (once) 602 * @return if negative, the (negative) number of drops; otherwise 603 * an estimate of maximum lag 604 * @throws IllegalStateException if closed 605 * @throws NullPointerException if item is null 606 * @throws RejectedExecutionException if thrown by Executor 607 */ offer(T item, long timeout, TimeUnit unit, BiPredicate<Subscriber<? super T>, ? super T> onDrop)608 public int offer(T item, long timeout, TimeUnit unit, 609 BiPredicate<Subscriber<? super T>, ? super T> onDrop) { 610 long nanos = unit.toNanos(timeout); 611 // distinguishes from untimed (only wrt interrupt policy) 612 if (nanos == Long.MAX_VALUE) --nanos; 613 return doOffer(item, nanos, onDrop); 614 } 615 616 /** 617 * Unless already closed, issues {@link 618 * Flow.Subscriber#onComplete() onComplete} signals to current 619 * subscribers, and disallows subsequent attempts to publish. 620 * Upon return, this method does <em>NOT</em> guarantee that all 621 * subscribers have yet completed. 622 */ close()623 public void close() { 624 ReentrantLock lock = this.lock; 625 if (!closed) { 626 BufferedSubscription<T> b; 627 lock.lock(); 628 try { 629 // no need to re-check closed here 630 b = clients; 631 clients = null; 632 owner = null; 633 closed = true; 634 } finally { 635 lock.unlock(); 636 } 637 while (b != null) { 638 BufferedSubscription<T> next = b.next; 639 b.next = null; 640 b.onComplete(); 641 b = next; 642 } 643 } 644 } 645 646 /** 647 * Unless already closed, issues {@link 648 * Flow.Subscriber#onError(Throwable) onError} signals to current 649 * subscribers with the given error, and disallows subsequent 650 * attempts to publish. Future subscribers also receive the given 651 * error. Upon return, this method does <em>NOT</em> guarantee 652 * that all subscribers have yet completed. 653 * 654 * @param error the {@code onError} argument sent to subscribers 655 * @throws NullPointerException if error is null 656 */ closeExceptionally(Throwable error)657 public void closeExceptionally(Throwable error) { 658 if (error == null) 659 throw new NullPointerException(); 660 ReentrantLock lock = this.lock; 661 if (!closed) { 662 BufferedSubscription<T> b; 663 lock.lock(); 664 try { 665 b = clients; 666 if (!closed) { // don't clobber racing close 667 closedException = error; 668 clients = null; 669 owner = null; 670 closed = true; 671 } 672 } finally { 673 lock.unlock(); 674 } 675 while (b != null) { 676 BufferedSubscription<T> next = b.next; 677 b.next = null; 678 b.onError(error); 679 b = next; 680 } 681 } 682 } 683 684 /** 685 * Returns true if this publisher is not accepting submissions. 686 * 687 * @return true if closed 688 */ isClosed()689 public boolean isClosed() { 690 return closed; 691 } 692 693 /** 694 * Returns the exception associated with {@link 695 * #closeExceptionally(Throwable) closeExceptionally}, or null if 696 * not closed or if closed normally. 697 * 698 * @return the exception, or null if none 699 */ getClosedException()700 public Throwable getClosedException() { 701 return closedException; 702 } 703 704 /** 705 * Returns true if this publisher has any subscribers. 706 * 707 * @return true if this publisher has any subscribers 708 */ hasSubscribers()709 public boolean hasSubscribers() { 710 boolean nonEmpty = false; 711 ReentrantLock lock = this.lock; 712 lock.lock(); 713 try { 714 for (BufferedSubscription<T> b = clients; b != null;) { 715 BufferedSubscription<T> next = b.next; 716 if (b.isClosed()) { 717 b.next = null; 718 b = clients = next; 719 } 720 else { 721 nonEmpty = true; 722 break; 723 } 724 } 725 } finally { 726 lock.unlock(); 727 } 728 return nonEmpty; 729 } 730 731 /** 732 * Returns the number of current subscribers. 733 * 734 * @return the number of current subscribers 735 */ getNumberOfSubscribers()736 public int getNumberOfSubscribers() { 737 int n; 738 ReentrantLock lock = this.lock; 739 lock.lock(); 740 try { 741 n = cleanAndCount(); 742 } finally { 743 lock.unlock(); 744 } 745 return n; 746 } 747 748 /** 749 * Returns the Executor used for asynchronous delivery. 750 * 751 * @return the Executor used for asynchronous delivery 752 */ getExecutor()753 public Executor getExecutor() { 754 return executor; 755 } 756 757 /** 758 * Returns the maximum per-subscriber buffer capacity. 759 * 760 * @return the maximum per-subscriber buffer capacity 761 */ getMaxBufferCapacity()762 public int getMaxBufferCapacity() { 763 return maxBufferCapacity; 764 } 765 766 /** 767 * Returns a list of current subscribers for monitoring and 768 * tracking purposes, not for invoking {@link Flow.Subscriber} 769 * methods on the subscribers. 770 * 771 * @return list of current subscribers 772 */ getSubscribers()773 public List<Subscriber<? super T>> getSubscribers() { 774 ArrayList<Subscriber<? super T>> subs = new ArrayList<>(); 775 ReentrantLock lock = this.lock; 776 lock.lock(); 777 try { 778 BufferedSubscription<T> pred = null, next; 779 for (BufferedSubscription<T> b = clients; b != null; b = next) { 780 next = b.next; 781 if (b.isClosed()) { 782 b.next = null; 783 if (pred == null) 784 clients = next; 785 else 786 pred.next = next; 787 } 788 else { 789 subs.add(b.subscriber); 790 pred = b; 791 } 792 } 793 } finally { 794 lock.unlock(); 795 } 796 return subs; 797 } 798 799 /** 800 * Returns true if the given Subscriber is currently subscribed. 801 * 802 * @param subscriber the subscriber 803 * @return true if currently subscribed 804 * @throws NullPointerException if subscriber is null 805 */ isSubscribed(Subscriber<? super T> subscriber)806 public boolean isSubscribed(Subscriber<? super T> subscriber) { 807 if (subscriber == null) throw new NullPointerException(); 808 boolean subscribed = false; 809 ReentrantLock lock = this.lock; 810 if (!closed) { 811 lock.lock(); 812 try { 813 BufferedSubscription<T> pred = null, next; 814 for (BufferedSubscription<T> b = clients; b != null; b = next) { 815 next = b.next; 816 if (b.isClosed()) { 817 b.next = null; 818 if (pred == null) 819 clients = next; 820 else 821 pred.next = next; 822 } 823 else if (subscribed = subscriber.equals(b.subscriber)) 824 break; 825 else 826 pred = b; 827 } 828 } finally { 829 lock.unlock(); 830 } 831 } 832 return subscribed; 833 } 834 835 /** 836 * Returns an estimate of the minimum number of items requested 837 * (via {@link Flow.Subscription#request(long) request}) but not 838 * yet produced, among all current subscribers. 839 * 840 * @return the estimate, or zero if no subscribers 841 */ estimateMinimumDemand()842 public long estimateMinimumDemand() { 843 long min = Long.MAX_VALUE; 844 boolean nonEmpty = false; 845 ReentrantLock lock = this.lock; 846 lock.lock(); 847 try { 848 BufferedSubscription<T> pred = null, next; 849 for (BufferedSubscription<T> b = clients; b != null; b = next) { 850 int n; long d; 851 next = b.next; 852 if ((n = b.estimateLag()) < 0) { 853 b.next = null; 854 if (pred == null) 855 clients = next; 856 else 857 pred.next = next; 858 } 859 else { 860 if ((d = b.demand - n) < min) 861 min = d; 862 nonEmpty = true; 863 pred = b; 864 } 865 } 866 } finally { 867 lock.unlock(); 868 } 869 return nonEmpty ? min : 0; 870 } 871 872 /** 873 * Returns an estimate of the maximum number of items produced but 874 * not yet consumed among all current subscribers. 875 * 876 * @return the estimate 877 */ estimateMaximumLag()878 public int estimateMaximumLag() { 879 int max = 0; 880 ReentrantLock lock = this.lock; 881 lock.lock(); 882 try { 883 BufferedSubscription<T> pred = null, next; 884 for (BufferedSubscription<T> b = clients; b != null; b = next) { 885 int n; 886 next = b.next; 887 if ((n = b.estimateLag()) < 0) { 888 b.next = null; 889 if (pred == null) 890 clients = next; 891 else 892 pred.next = next; 893 } 894 else { 895 if (n > max) 896 max = n; 897 pred = b; 898 } 899 } 900 } finally { 901 lock.unlock(); 902 } 903 return max; 904 } 905 906 /** 907 * Processes all published items using the given Consumer function. 908 * Returns a CompletableFuture that is completed normally when this 909 * publisher signals {@link Flow.Subscriber#onComplete() 910 * onComplete}, or completed exceptionally upon any error, or an 911 * exception is thrown by the Consumer, or the returned 912 * CompletableFuture is cancelled, in which case no further items 913 * are processed. 914 * 915 * @param consumer the function applied to each onNext item 916 * @return a CompletableFuture that is completed normally 917 * when the publisher signals onComplete, and exceptionally 918 * upon any error or cancellation 919 * @throws NullPointerException if consumer is null 920 */ consume(Consumer<? super T> consumer)921 public CompletableFuture<Void> consume(Consumer<? super T> consumer) { 922 if (consumer == null) 923 throw new NullPointerException(); 924 CompletableFuture<Void> status = new CompletableFuture<>(); 925 subscribe(new ConsumerSubscriber<T>(status, consumer)); 926 return status; 927 } 928 929 /** Subscriber for method consume */ 930 static final class ConsumerSubscriber<T> implements Subscriber<T> { 931 final CompletableFuture<Void> status; 932 final Consumer<? super T> consumer; 933 Subscription subscription; ConsumerSubscriber(CompletableFuture<Void> status, Consumer<? super T> consumer)934 ConsumerSubscriber(CompletableFuture<Void> status, 935 Consumer<? super T> consumer) { 936 this.status = status; this.consumer = consumer; 937 } onSubscribe(Subscription subscription)938 public final void onSubscribe(Subscription subscription) { 939 this.subscription = subscription; 940 status.whenComplete((v, e) -> subscription.cancel()); 941 if (!status.isDone()) 942 subscription.request(Long.MAX_VALUE); 943 } onError(Throwable ex)944 public final void onError(Throwable ex) { 945 status.completeExceptionally(ex); 946 } onComplete()947 public final void onComplete() { 948 status.complete(null); 949 } onNext(T item)950 public final void onNext(T item) { 951 try { 952 consumer.accept(item); 953 } catch (Throwable ex) { 954 subscription.cancel(); 955 status.completeExceptionally(ex); 956 } 957 } 958 } 959 960 /** 961 * A task for consuming buffer items and signals, created and 962 * executed whenever they become available. A task consumes as 963 * many items/signals as possible before terminating, at which 964 * point another task is created when needed. The dual Runnable 965 * and ForkJoinTask declaration saves overhead when executed by 966 * ForkJoinPools, without impacting other kinds of Executors. 967 */ 968 @SuppressWarnings("serial") 969 static final class ConsumerTask<T> extends ForkJoinTask<Void> 970 implements Runnable, CompletableFuture.AsynchronousCompletionTask { 971 final BufferedSubscription<T> consumer; ConsumerTask(BufferedSubscription<T> consumer)972 ConsumerTask(BufferedSubscription<T> consumer) { 973 this.consumer = consumer; 974 } getRawResult()975 public final Void getRawResult() { return null; } setRawResult(Void v)976 public final void setRawResult(Void v) {} exec()977 public final boolean exec() { consumer.consume(); return false; } run()978 public final void run() { consumer.consume(); } 979 } 980 981 /** 982 * A resizable array-based ring buffer with integrated control to 983 * start a consumer task whenever items are available. The buffer 984 * algorithm is specialized for the case of at most one concurrent 985 * producer and consumer, and power of two buffer sizes. It relies 986 * primarily on atomic operations (CAS or getAndSet) at the next 987 * array slot to put or take an element, at the "tail" and "head" 988 * indices written only by the producer and consumer respectively. 989 * 990 * We ensure internally that there is at most one active consumer 991 * task at any given time. The publisher guarantees a single 992 * producer via its lock. Sync among producers and consumers 993 * relies on volatile fields "ctl", "demand", and "waiting" (along 994 * with element access). Other variables are accessed in plain 995 * mode, relying on outer ordering and exclusion, and/or enclosing 996 * them within other volatile accesses. Some atomic operations are 997 * avoided by tracking single threaded ownership by producers (in 998 * the style of biased locking). 999 * 1000 * Execution control and protocol state are managed using field 1001 * "ctl". Methods to subscribe, close, request, and cancel set 1002 * ctl bits (mostly using atomic boolean method getAndBitwiseOr), 1003 * and ensure that a task is running. (The corresponding consumer 1004 * side actions are in method consume.) To avoid starting a new 1005 * task on each action, ctl also includes a keep-alive bit 1006 * (ACTIVE) that is refreshed if needed on producer actions. 1007 * (Maintaining agreement about keep-alives requires most atomic 1008 * updates to be full SC/Volatile strength, which is still much 1009 * cheaper than using one task per item.) Error signals 1010 * additionally null out items and/or fields to reduce termination 1011 * latency. The cancel() method is supported by treating as ERROR 1012 * but suppressing onError signal. 1013 * 1014 * Support for blocking also exploits the fact that there is only 1015 * one possible waiter. ManagedBlocker-compatible control fields 1016 * are placed in this class itself rather than in wait-nodes. 1017 * Blocking control relies on the "waiting" and "waiter" 1018 * fields. Producers set them before trying to block. Signalling 1019 * unparks and clears fields. If the producer and/or consumer are 1020 * using a ForkJoinPool, the producer attempts to help run 1021 * consumer tasks via ForkJoinPool.helpAsyncBlocker before 1022 * blocking. 1023 * 1024 * Usages of this class may encounter any of several forms of 1025 * memory contention. We try to ameliorate across them without 1026 * unduly impacting footprints in low-contention usages where it 1027 * isn't needed. Buffer arrays start out small and grow only as 1028 * needed. The class uses @Contended and heuristic field 1029 * declaration ordering to reduce false-sharing memory contention 1030 * across instances of BufferedSubscription (as in, multiple 1031 * subscribers per publisher). We additionally segregate some 1032 * fields that would otherwise nearly always encounter cache line 1033 * contention among producers and consumers. To reduce contention 1034 * across time (vs space), consumers only periodically update 1035 * other fields (see method takeItems), at the expense of possibly 1036 * staler reporting of lags and demand (bounded at 12.5% == 1/8 1037 * capacity) and possibly more atomic operations. 1038 * 1039 * Other forms of imbalance and slowdowns can occur during startup 1040 * when producer and consumer methods are compiled and/or memory 1041 * is allocated at different rates. This is ameliorated by 1042 * artificially subdividing some consumer methods, including 1043 * isolation of all subscriber callbacks. This code also includes 1044 * typical power-of-two array screening idioms to avoid compilers 1045 * generating traps, along with the usual SSA-based inline 1046 * assignment coding style. Also, all methods and fields have 1047 * default visibility to simplify usage by callers. 1048 */ 1049 @SuppressWarnings("serial") 1050 @jdk.internal.vm.annotation.Contended 1051 static final class BufferedSubscription<T> 1052 implements Subscription, ForkJoinPool.ManagedBlocker { 1053 long timeout; // Long.MAX_VALUE if untimed wait 1054 int head; // next position to take 1055 int tail; // next position to put 1056 final int maxCapacity; // max buffer size 1057 volatile int ctl; // atomic run state flags 1058 Object[] array; // buffer 1059 final Subscriber<? super T> subscriber; 1060 final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler; 1061 Executor executor; // null on error 1062 Thread waiter; // blocked producer thread 1063 Throwable pendingError; // holds until onError issued 1064 BufferedSubscription<T> next; // used only by publisher 1065 BufferedSubscription<T> nextRetry; // used only by publisher 1066 1067 @jdk.internal.vm.annotation.Contended("c") // segregate 1068 volatile long demand; // # unfilled requests 1069 @jdk.internal.vm.annotation.Contended("c") 1070 volatile int waiting; // nonzero if producer blocked 1071 1072 // ctl bit values 1073 static final int CLOSED = 0x01; // if set, other bits ignored 1074 static final int ACTIVE = 0x02; // keep-alive for consumer task 1075 static final int REQS = 0x04; // (possibly) nonzero demand 1076 static final int ERROR = 0x08; // issues onError when noticed 1077 static final int COMPLETE = 0x10; // issues onComplete when done 1078 static final int RUN = 0x20; // task is or will be running 1079 static final int OPEN = 0x40; // true after subscribe 1080 1081 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel 1082 BufferedSubscription(Subscriber<? super T> subscriber, Executor executor, BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler, Object[] array, int maxBufferCapacity)1083 BufferedSubscription(Subscriber<? super T> subscriber, 1084 Executor executor, 1085 BiConsumer<? super Subscriber<? super T>, 1086 ? super Throwable> onNextHandler, 1087 Object[] array, 1088 int maxBufferCapacity) { 1089 this.subscriber = subscriber; 1090 this.executor = executor; 1091 this.onNextHandler = onNextHandler; 1092 this.array = array; 1093 this.maxCapacity = maxBufferCapacity; 1094 } 1095 1096 // Wrappers for some VarHandle methods 1097 weakCasCtl(int cmp, int val)1098 final boolean weakCasCtl(int cmp, int val) { 1099 return CTL.weakCompareAndSet(this, cmp, val); 1100 } 1101 getAndBitwiseOrCtl(int bits)1102 final int getAndBitwiseOrCtl(int bits) { 1103 return (int)CTL.getAndBitwiseOr(this, bits); 1104 } 1105 subtractDemand(int k)1106 final long subtractDemand(int k) { 1107 long n = (long)(-k); 1108 return n + (long)DEMAND.getAndAdd(this, n); 1109 } 1110 casDemand(long cmp, long val)1111 final boolean casDemand(long cmp, long val) { 1112 return DEMAND.compareAndSet(this, cmp, val); 1113 } 1114 1115 // Utilities used by SubmissionPublisher 1116 1117 /** 1118 * Returns true if closed (consumer task may still be running). 1119 */ isClosed()1120 final boolean isClosed() { 1121 return (ctl & CLOSED) != 0; 1122 } 1123 1124 /** 1125 * Returns estimated number of buffered items, or negative if 1126 * closed. 1127 */ estimateLag()1128 final int estimateLag() { 1129 int c = ctl, n = tail - head; 1130 return ((c & CLOSED) != 0) ? -1 : (n < 0) ? 0 : n; 1131 } 1132 1133 // Methods for submitting items 1134 1135 /** 1136 * Tries to add item and start consumer task if necessary. 1137 * @return negative if closed, 0 if saturated, else estimated lag 1138 */ offer(T item, boolean unowned)1139 final int offer(T item, boolean unowned) { 1140 Object[] a; 1141 int stat = 0, cap = ((a = array) == null) ? 0 : a.length; 1142 int t = tail, i = t & (cap - 1), n = t + 1 - head; 1143 if (cap > 0) { 1144 boolean added; 1145 if (n >= cap && cap < maxCapacity) // resize 1146 added = growAndOffer(item, a, t); 1147 else if (n >= cap || unowned) // need volatile CAS 1148 added = QA.compareAndSet(a, i, null, item); 1149 else { // can use release mode 1150 QA.setRelease(a, i, item); 1151 added = true; 1152 } 1153 if (added) { 1154 tail = t + 1; 1155 stat = n; 1156 } 1157 } 1158 return startOnOffer(stat); 1159 } 1160 1161 /** 1162 * Tries to expand buffer and add item, returning true on 1163 * success. Currently fails only if out of memory. 1164 */ growAndOffer(T item, Object[] a, int t)1165 final boolean growAndOffer(T item, Object[] a, int t) { 1166 int cap = 0, newCap = 0; 1167 Object[] newArray = null; 1168 if (a != null && (cap = a.length) > 0 && (newCap = cap << 1) > 0) { 1169 try { 1170 newArray = new Object[newCap]; 1171 } catch (OutOfMemoryError ex) { 1172 } 1173 } 1174 if (newArray == null) 1175 return false; 1176 else { // take and move items 1177 int newMask = newCap - 1; 1178 newArray[t-- & newMask] = item; 1179 for (int mask = cap - 1, k = mask; k >= 0; --k) { 1180 Object x = QA.getAndSet(a, t & mask, null); 1181 if (x == null) 1182 break; // already consumed 1183 else 1184 newArray[t-- & newMask] = x; 1185 } 1186 array = newArray; 1187 VarHandle.releaseFence(); // release array and slots 1188 return true; 1189 } 1190 } 1191 1192 /** 1193 * Version of offer for retries (no resize or bias) 1194 */ retryOffer(T item)1195 final int retryOffer(T item) { 1196 Object[] a; 1197 int stat = 0, t = tail, h = head, cap; 1198 if ((a = array) != null && (cap = a.length) > 0 && 1199 QA.compareAndSet(a, (cap - 1) & t, null, item)) 1200 stat = (tail = t + 1) - h; 1201 return startOnOffer(stat); 1202 } 1203 1204 /** 1205 * Tries to start consumer task after offer. 1206 * @return negative if now closed, else argument 1207 */ startOnOffer(int stat)1208 final int startOnOffer(int stat) { 1209 int c; // start or keep alive if requests exist and not active 1210 if (((c = ctl) & (REQS | ACTIVE)) == REQS && 1211 ((c = getAndBitwiseOrCtl(RUN | ACTIVE)) & (RUN | CLOSED)) == 0) 1212 tryStart(); 1213 else if ((c & CLOSED) != 0) 1214 stat = -1; 1215 return stat; 1216 } 1217 1218 /** 1219 * Tries to start consumer task. Sets error state on failure. 1220 */ tryStart()1221 final void tryStart() { 1222 try { 1223 Executor e; 1224 ConsumerTask<T> task = new ConsumerTask<T>(this); 1225 if ((e = executor) != null) // skip if disabled on error 1226 e.execute(task); 1227 } catch (RuntimeException | Error ex) { 1228 getAndBitwiseOrCtl(ERROR | CLOSED); 1229 throw ex; 1230 } 1231 } 1232 1233 // Signals to consumer tasks 1234 1235 /** 1236 * Sets the given control bits, starting task if not running or closed. 1237 * @param bits state bits, assumed to include RUN but not CLOSED 1238 */ startOnSignal(int bits)1239 final void startOnSignal(int bits) { 1240 if ((ctl & bits) != bits && 1241 (getAndBitwiseOrCtl(bits) & (RUN | CLOSED)) == 0) 1242 tryStart(); 1243 } 1244 onSubscribe()1245 final void onSubscribe() { 1246 startOnSignal(RUN | ACTIVE); 1247 } 1248 onComplete()1249 final void onComplete() { 1250 startOnSignal(RUN | ACTIVE | COMPLETE); 1251 } 1252 onError(Throwable ex)1253 final void onError(Throwable ex) { 1254 int c; Object[] a; // to null out buffer on async error 1255 if (ex != null) 1256 pendingError = ex; // races are OK 1257 if (((c = getAndBitwiseOrCtl(ERROR | RUN | ACTIVE)) & CLOSED) == 0) { 1258 if ((c & RUN) == 0) 1259 tryStart(); 1260 else if ((a = array) != null) 1261 Arrays.fill(a, null); 1262 } 1263 } 1264 cancel()1265 public final void cancel() { 1266 onError(null); 1267 } 1268 request(long n)1269 public final void request(long n) { 1270 if (n > 0L) { 1271 for (;;) { 1272 long p = demand, d = p + n; // saturate 1273 if (casDemand(p, d < p ? Long.MAX_VALUE : d)) 1274 break; 1275 } 1276 startOnSignal(RUN | ACTIVE | REQS); 1277 } 1278 else 1279 onError(new IllegalArgumentException( 1280 "non-positive subscription request")); 1281 } 1282 1283 // Consumer task actions 1284 1285 /** 1286 * Consumer loop, called from ConsumerTask, or indirectly when 1287 * helping during submit. 1288 */ consume()1289 final void consume() { 1290 Subscriber<? super T> s; 1291 if ((s = subscriber) != null) { // hoist checks 1292 subscribeOnOpen(s); 1293 long d = demand; 1294 for (int h = head, t = tail;;) { 1295 int c, taken; boolean empty; 1296 if (((c = ctl) & ERROR) != 0) { 1297 closeOnError(s, null); 1298 break; 1299 } 1300 else if ((taken = takeItems(s, d, h)) > 0) { 1301 head = h += taken; 1302 d = subtractDemand(taken); 1303 } 1304 else if ((d = demand) == 0L && (c & REQS) != 0) 1305 weakCasCtl(c, c & ~REQS); // exhausted demand 1306 else if (d != 0L && (c & REQS) == 0) 1307 weakCasCtl(c, c | REQS); // new demand 1308 else if (t == (t = tail)) { // stability check 1309 if ((empty = (t == h)) && (c & COMPLETE) != 0) { 1310 closeOnComplete(s); // end of stream 1311 break; 1312 } 1313 else if (empty || d == 0L) { 1314 int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN; 1315 if (weakCasCtl(c, c & ~bit) && bit == RUN) 1316 break; // un-keep-alive or exit 1317 } 1318 } 1319 } 1320 } 1321 } 1322 1323 /** 1324 * Consumes some items until unavailable or bound or error. 1325 * 1326 * @param s subscriber 1327 * @param d current demand 1328 * @param h current head 1329 * @return number taken 1330 */ takeItems(Subscriber<? super T> s, long d, int h)1331 final int takeItems(Subscriber<? super T> s, long d, int h) { 1332 Object[] a; 1333 int k = 0, cap; 1334 if ((a = array) != null && (cap = a.length) > 0) { 1335 int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8) 1336 int n = (d < (long)b) ? (int)d : b; 1337 for (; k < n; ++h, ++k) { 1338 Object x = QA.getAndSet(a, h & m, null); 1339 if (waiting != 0) 1340 signalWaiter(); 1341 if (x == null) 1342 break; 1343 else if (!consumeNext(s, x)) 1344 break; 1345 } 1346 } 1347 return k; 1348 } 1349 consumeNext(Subscriber<? super T> s, Object x)1350 final boolean consumeNext(Subscriber<? super T> s, Object x) { 1351 try { 1352 @SuppressWarnings("unchecked") T y = (T) x; 1353 if (s != null) 1354 s.onNext(y); 1355 return true; 1356 } catch (Throwable ex) { 1357 handleOnNext(s, ex); 1358 return false; 1359 } 1360 } 1361 1362 /** 1363 * Processes exception in Subscriber.onNext. 1364 */ handleOnNext(Subscriber<? super T> s, Throwable ex)1365 final void handleOnNext(Subscriber<? super T> s, Throwable ex) { 1366 BiConsumer<? super Subscriber<? super T>, ? super Throwable> h; 1367 try { 1368 if ((h = onNextHandler) != null) 1369 h.accept(s, ex); 1370 } catch (Throwable ignore) { 1371 } 1372 closeOnError(s, ex); 1373 } 1374 1375 /** 1376 * Issues subscriber.onSubscribe if this is first signal. 1377 */ subscribeOnOpen(Subscriber<? super T> s)1378 final void subscribeOnOpen(Subscriber<? super T> s) { 1379 if ((ctl & OPEN) == 0 && (getAndBitwiseOrCtl(OPEN) & OPEN) == 0) 1380 consumeSubscribe(s); 1381 } 1382 consumeSubscribe(Subscriber<? super T> s)1383 final void consumeSubscribe(Subscriber<? super T> s) { 1384 try { 1385 if (s != null) // ignore if disabled 1386 s.onSubscribe(this); 1387 } catch (Throwable ex) { 1388 closeOnError(s, ex); 1389 } 1390 } 1391 1392 /** 1393 * Issues subscriber.onComplete unless already closed. 1394 */ closeOnComplete(Subscriber<? super T> s)1395 final void closeOnComplete(Subscriber<? super T> s) { 1396 if ((getAndBitwiseOrCtl(CLOSED) & CLOSED) == 0) 1397 consumeComplete(s); 1398 } 1399 consumeComplete(Subscriber<? super T> s)1400 final void consumeComplete(Subscriber<? super T> s) { 1401 try { 1402 if (s != null) 1403 s.onComplete(); 1404 } catch (Throwable ignore) { 1405 } 1406 } 1407 1408 /** 1409 * Issues subscriber.onError, and unblocks producer if needed. 1410 */ closeOnError(Subscriber<? super T> s, Throwable ex)1411 final void closeOnError(Subscriber<? super T> s, Throwable ex) { 1412 if ((getAndBitwiseOrCtl(ERROR | CLOSED) & CLOSED) == 0) { 1413 if (ex == null) 1414 ex = pendingError; 1415 pendingError = null; // detach 1416 executor = null; // suppress racing start calls 1417 signalWaiter(); 1418 consumeError(s, ex); 1419 } 1420 } 1421 consumeError(Subscriber<? super T> s, Throwable ex)1422 final void consumeError(Subscriber<? super T> s, Throwable ex) { 1423 try { 1424 if (ex != null && s != null) 1425 s.onError(ex); 1426 } catch (Throwable ignore) { 1427 } 1428 } 1429 1430 // Blocking support 1431 1432 /** 1433 * Unblocks waiting producer. 1434 */ signalWaiter()1435 final void signalWaiter() { 1436 Thread w; 1437 waiting = 0; 1438 if ((w = waiter) != null) 1439 LockSupport.unpark(w); 1440 } 1441 1442 /** 1443 * Returns true if closed or space available. 1444 * For ManagedBlocker. 1445 */ isReleasable()1446 public final boolean isReleasable() { 1447 Object[] a; int cap; 1448 return ((ctl & CLOSED) != 0 || 1449 ((a = array) != null && (cap = a.length) > 0 && 1450 QA.getAcquire(a, (cap - 1) & tail) == null)); 1451 } 1452 1453 /** 1454 * Helps or blocks until timeout, closed, or space available. 1455 */ awaitSpace(long nanos)1456 final void awaitSpace(long nanos) { 1457 if (!isReleasable()) { 1458 ForkJoinPool.helpAsyncBlocker(executor, this); 1459 if (!isReleasable()) { 1460 timeout = nanos; 1461 try { 1462 ForkJoinPool.managedBlock(this); 1463 } catch (InterruptedException ie) { 1464 timeout = INTERRUPTED; 1465 } 1466 if (timeout == INTERRUPTED) 1467 Thread.currentThread().interrupt(); 1468 } 1469 } 1470 } 1471 1472 /** 1473 * Blocks until closed, space available or timeout. 1474 * For ManagedBlocker. 1475 */ block()1476 public final boolean block() { 1477 long nanos = timeout; 1478 boolean timed = (nanos < Long.MAX_VALUE); 1479 long deadline = timed ? System.nanoTime() + nanos : 0L; 1480 while (!isReleasable()) { 1481 if (Thread.interrupted()) { 1482 timeout = INTERRUPTED; 1483 if (timed) 1484 break; 1485 } 1486 else if (timed && (nanos = deadline - System.nanoTime()) <= 0L) 1487 break; 1488 else if (waiter == null) 1489 waiter = Thread.currentThread(); 1490 else if (waiting == 0) 1491 waiting = 1; 1492 else if (timed) 1493 LockSupport.parkNanos(this, nanos); 1494 else 1495 LockSupport.park(this); 1496 } 1497 waiter = null; 1498 waiting = 0; 1499 return true; 1500 } 1501 1502 // VarHandle mechanics 1503 static final VarHandle CTL; 1504 static final VarHandle DEMAND; 1505 static final VarHandle QA; 1506 1507 static { 1508 try { 1509 MethodHandles.Lookup l = MethodHandles.lookup(); 1510 CTL = l.findVarHandle(BufferedSubscription.class, "ctl", 1511 int.class); 1512 DEMAND = l.findVarHandle(BufferedSubscription.class, "demand", 1513 long.class); 1514 QA = MethodHandles.arrayElementVarHandle(Object[].class); 1515 } catch (ReflectiveOperationException e) { 1516 throw new ExceptionInInitializerError(e); 1517 } 1518 1519 // Reduce the risk of rare disastrous classloading in first call to 1520 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 1521 Class<?> ensureLoaded = LockSupport.class; 1522 } 1523 } 1524 } 1525