1/* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25/* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36package java.util.concurrent; 37 38import java.lang.invoke.MethodHandles; 39import java.lang.invoke.VarHandle; 40import java.util.ArrayList; 41import java.util.List; 42import java.util.concurrent.locks.LockSupport; 43import java.util.function.BiConsumer; 44import java.util.function.BiPredicate; 45import java.util.function.Consumer; 46 47/** 48 * A {@link Flow.Publisher} that asynchronously issues submitted 49 * (non-null) items to current subscribers until it is closed. Each 50 * current subscriber receives newly submitted items in the same order 51 * unless drops or exceptions are encountered. Using a 52 * SubmissionPublisher allows item generators to act as compliant <a 53 * href="http://www.reactive-streams.org/"> reactive-streams</a> 54 * Publishers relying on drop handling and/or blocking for flow 55 * control. 56 * 57 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its 58 * constructor for delivery to subscribers. The best choice of 59 * Executor depends on expected usage. If the generator(s) of 60 * submitted items run in separate threads, and the number of 61 * subscribers can be estimated, consider using a {@link 62 * Executors#newFixedThreadPool}. Otherwise consider using the 63 * default, normally the {@link ForkJoinPool#commonPool}. 64 * 65 * <p>Buffering allows producers and consumers to transiently operate 66 * at different rates. Each subscriber uses an independent buffer. 67 * Buffers are created upon first use and expanded as needed up to the 68 * given maximum. (The enforced capacity may be rounded up to the 69 * nearest power of two and/or bounded by the largest value supported 70 * by this implementation.) Invocations of {@link 71 * Flow.Subscription#request(long) request} do not directly result in 72 * buffer expansion, but risk saturation if unfilled requests exceed 73 * the maximum capacity. The default value of {@link 74 * Flow#defaultBufferSize()} may provide a useful starting point for 75 * choosing a capacity based on expected rates, resources, and usages. 76 * 77 * <p>Publication methods support different policies about what to do 78 * when buffers are saturated. Method {@link #submit(Object) submit} 79 * blocks until resources are available. This is simplest, but least 80 * responsive. The {@code offer} methods may drop items (either 81 * immediately or with bounded timeout), but provide an opportunity to 82 * interpose a handler and then retry. 83 * 84 * <p>If any Subscriber method throws an exception, its subscription 85 * is cancelled. If a handler is supplied as a constructor argument, 86 * it is invoked before cancellation upon an exception in method 87 * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods 88 * {@link Flow.Subscriber#onSubscribe onSubscribe}, 89 * {@link Flow.Subscriber#onError(Throwable) onError} and 90 * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or 91 * handled before cancellation. If the supplied Executor throws 92 * {@link RejectedExecutionException} (or any other RuntimeException 93 * or Error) when attempting to execute a task, or a drop handler 94 * throws an exception when processing a dropped item, then the 95 * exception is rethrown. In these cases, not all subscribers will 96 * have been issued the published item. It is usually good practice to 97 * {@link #closeExceptionally closeExceptionally} in these cases. 98 * 99 * <p>Method {@link #consume(Consumer)} simplifies support for a 100 * common case in which the only action of a subscriber is to request 101 * and process all items using a supplied function. 102 * 103 * <p>This class may also serve as a convenient base for subclasses 104 * that generate items, and use the methods in this class to publish 105 * them. For example here is a class that periodically publishes the 106 * items generated from a supplier. (In practice you might add methods 107 * to independently start and stop generation, to share Executors 108 * among publishers, and so on, or use a SubmissionPublisher as a 109 * component rather than a superclass.) 110 * 111 * <pre> {@code 112 * class PeriodicPublisher<T> extends SubmissionPublisher<T> { 113 * final ScheduledFuture<?> periodicTask; 114 * final ScheduledExecutorService scheduler; 115 * PeriodicPublisher(Executor executor, int maxBufferCapacity, 116 * Supplier<? extends T> supplier, 117 * long period, TimeUnit unit) { 118 * super(executor, maxBufferCapacity); 119 * scheduler = new ScheduledThreadPoolExecutor(1); 120 * periodicTask = scheduler.scheduleAtFixedRate( 121 * () -> submit(supplier.get()), 0, period, unit); 122 * } 123 * public void close() { 124 * periodicTask.cancel(false); 125 * scheduler.shutdown(); 126 * super.close(); 127 * } 128 * }}</pre> 129 * 130 * <p>Here is an example of a {@link Flow.Processor} implementation. 131 * It uses single-step requests to its publisher for simplicity of 132 * illustration. A more adaptive version could monitor flow using the 133 * lag estimate returned from {@code submit}, along with other utility 134 * methods. 135 * 136 * <pre> {@code 137 * class TransformProcessor<S,T> extends SubmissionPublisher<T> 138 * implements Flow.Processor<S,T> { 139 * final Function<? super S, ? extends T> function; 140 * Flow.Subscription subscription; 141 * TransformProcessor(Executor executor, int maxBufferCapacity, 142 * Function<? super S, ? extends T> function) { 143 * super(executor, maxBufferCapacity); 144 * this.function = function; 145 * } 146 * public void onSubscribe(Flow.Subscription subscription) { 147 * (this.subscription = subscription).request(1); 148 * } 149 * public void onNext(S item) { 150 * subscription.request(1); 151 * submit(function.apply(item)); 152 * } 153 * public void onError(Throwable ex) { closeExceptionally(ex); } 154 * public void onComplete() { close(); } 155 * }}</pre> 156 * 157 * @param <T> the published item type 158 * @author Doug Lea 159 * @since 9 160 */ 161public class SubmissionPublisher<T> implements Flow.Publisher<T>, 162 AutoCloseable { 163 /* 164 * Most mechanics are handled by BufferedSubscription. This class 165 * mainly tracks subscribers and ensures sequentiality, by using 166 * built-in synchronization locks across public methods. (Using 167 * built-in locks works well in the most typical case in which 168 * only one thread submits items). 169 */ 170 171 /** The largest possible power of two array size. */ 172 static final int BUFFER_CAPACITY_LIMIT = 1 << 30; 173 174 /** Round capacity to power of 2, at most limit. */ 175 static final int roundCapacity(int cap) { 176 int n = cap - 1; 177 n |= n >>> 1; 178 n |= n >>> 2; 179 n |= n >>> 4; 180 n |= n >>> 8; 181 n |= n >>> 16; 182 return (n <= 0) ? 1 : // at least 1 183 (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1; 184 } 185 186 // default Executor setup; nearly the same as CompletableFuture 187 188 /** 189 * Default executor -- ForkJoinPool.commonPool() unless it cannot 190 * support parallelism. 191 */ 192 private static final Executor ASYNC_POOL = 193 (ForkJoinPool.getCommonPoolParallelism() > 1) ? 194 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); 195 196 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ 197 private static final class ThreadPerTaskExecutor implements Executor { 198 ThreadPerTaskExecutor() {} // prevent access constructor creation 199 public void execute(Runnable r) { new Thread(r).start(); } 200 } 201 202 /** 203 * Clients (BufferedSubscriptions) are maintained in a linked list 204 * (via their "next" fields). This works well for publish loops. 205 * It requires O(n) traversal to check for duplicate subscribers, 206 * but we expect that subscribing is much less common than 207 * publishing. Unsubscribing occurs only during traversal loops, 208 * when BufferedSubscription methods return negative values 209 * signifying that they have been disabled. To reduce 210 * head-of-line blocking, submit and offer methods first call 211 * BufferedSubscription.offer on each subscriber, and place 212 * saturated ones in retries list (using nextRetry field), and 213 * retry, possibly blocking or dropping. 214 */ 215 BufferedSubscription<T> clients; 216 217 /** Run status, updated only within locks */ 218 volatile boolean closed; 219 /** If non-null, the exception in closeExceptionally */ 220 volatile Throwable closedException; 221 222 // Parameters for constructing BufferedSubscriptions 223 final Executor executor; 224 final BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler; 225 final int maxBufferCapacity; 226 227 /** 228 * Creates a new SubmissionPublisher using the given Executor for 229 * async delivery to subscribers, with the given maximum buffer size 230 * for each subscriber, and, if non-null, the given handler invoked 231 * when any Subscriber throws an exception in method {@link 232 * Flow.Subscriber#onNext(Object) onNext}. 233 * 234 * @param executor the executor to use for async delivery, 235 * supporting creation of at least one independent thread 236 * @param maxBufferCapacity the maximum capacity for each 237 * subscriber's buffer (the enforced capacity may be rounded up to 238 * the nearest power of two and/or bounded by the largest value 239 * supported by this implementation; method {@link #getMaxBufferCapacity} 240 * returns the actual value) 241 * @param handler if non-null, procedure to invoke upon exception 242 * thrown in method {@code onNext} 243 * @throws NullPointerException if executor is null 244 * @throws IllegalArgumentException if maxBufferCapacity not 245 * positive 246 */ 247 public SubmissionPublisher(Executor executor, int maxBufferCapacity, 248 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) { 249 if (executor == null) 250 throw new NullPointerException(); 251 if (maxBufferCapacity <= 0) 252 throw new IllegalArgumentException("capacity must be positive"); 253 this.executor = executor; 254 this.onNextHandler = handler; 255 this.maxBufferCapacity = roundCapacity(maxBufferCapacity); 256 } 257 258 /** 259 * Creates a new SubmissionPublisher using the given Executor for 260 * async delivery to subscribers, with the given maximum buffer size 261 * for each subscriber, and no handler for Subscriber exceptions in 262 * method {@link Flow.Subscriber#onNext(Object) onNext}. 263 * 264 * @param executor the executor to use for async delivery, 265 * supporting creation of at least one independent thread 266 * @param maxBufferCapacity the maximum capacity for each 267 * subscriber's buffer (the enforced capacity may be rounded up to 268 * the nearest power of two and/or bounded by the largest value 269 * supported by this implementation; method {@link #getMaxBufferCapacity} 270 * returns the actual value) 271 * @throws NullPointerException if executor is null 272 * @throws IllegalArgumentException if maxBufferCapacity not 273 * positive 274 */ 275 public SubmissionPublisher(Executor executor, int maxBufferCapacity) { 276 this(executor, maxBufferCapacity, null); 277 } 278 279 /** 280 * Creates a new SubmissionPublisher using the {@link 281 * ForkJoinPool#commonPool()} for async delivery to subscribers 282 * (unless it does not support a parallelism level of at least two, 283 * in which case, a new Thread is created to run each task), with 284 * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no 285 * handler for Subscriber exceptions in method {@link 286 * Flow.Subscriber#onNext(Object) onNext}. 287 */ 288 public SubmissionPublisher() { 289 this(ASYNC_POOL, Flow.defaultBufferSize(), null); 290 } 291 292 /** 293 * Adds the given Subscriber unless already subscribed. If already 294 * subscribed, the Subscriber's {@link 295 * Flow.Subscriber#onError(Throwable) onError} method is invoked on 296 * the existing subscription with an {@link IllegalStateException}. 297 * Otherwise, upon success, the Subscriber's {@link 298 * Flow.Subscriber#onSubscribe onSubscribe} method is invoked 299 * asynchronously with a new {@link Flow.Subscription}. If {@link 300 * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the 301 * subscription is cancelled. Otherwise, if this SubmissionPublisher 302 * was closed exceptionally, then the subscriber's {@link 303 * Flow.Subscriber#onError onError} method is invoked with the 304 * corresponding exception, or if closed without exception, the 305 * subscriber's {@link Flow.Subscriber#onComplete() onComplete} 306 * method is invoked. Subscribers may enable receiving items by 307 * invoking the {@link Flow.Subscription#request(long) request} 308 * method of the new Subscription, and may unsubscribe by invoking 309 * its {@link Flow.Subscription#cancel() cancel} method. 310 * 311 * @param subscriber the subscriber 312 * @throws NullPointerException if subscriber is null 313 */ 314 public void subscribe(Flow.Subscriber<? super T> subscriber) { 315 if (subscriber == null) throw new NullPointerException(); 316 BufferedSubscription<T> subscription = 317 new BufferedSubscription<T>(subscriber, executor, 318 onNextHandler, maxBufferCapacity); 319 synchronized (this) { 320 for (BufferedSubscription<T> b = clients, pred = null;;) { 321 if (b == null) { 322 Throwable ex; 323 subscription.onSubscribe(); 324 if ((ex = closedException) != null) 325 subscription.onError(ex); 326 else if (closed) 327 subscription.onComplete(); 328 else if (pred == null) 329 clients = subscription; 330 else 331 pred.next = subscription; 332 break; 333 } 334 BufferedSubscription<T> next = b.next; 335 if (b.isDisabled()) { // remove 336 b.next = null; // detach 337 if (pred == null) 338 clients = next; 339 else 340 pred.next = next; 341 } 342 else if (subscriber.equals(b.subscriber)) { 343 b.onError(new IllegalStateException("Duplicate subscribe")); 344 break; 345 } 346 else 347 pred = b; 348 b = next; 349 } 350 } 351 } 352 353 /** 354 * Publishes the given item to each current subscriber by 355 * asynchronously invoking its {@link Flow.Subscriber#onNext(Object) 356 * onNext} method, blocking uninterruptibly while resources for any 357 * subscriber are unavailable. This method returns an estimate of 358 * the maximum lag (number of items submitted but not yet consumed) 359 * among all current subscribers. This value is at least one 360 * (accounting for this submitted item) if there are any 361 * subscribers, else zero. 362 * 363 * <p>If the Executor for this publisher throws a 364 * RejectedExecutionException (or any other RuntimeException or 365 * Error) when attempting to asynchronously notify subscribers, 366 * then this exception is rethrown, in which case not all 367 * subscribers will have been issued this item. 368 * 369 * @param item the (non-null) item to publish 370 * @return the estimated maximum lag among subscribers 371 * @throws IllegalStateException if closed 372 * @throws NullPointerException if item is null 373 * @throws RejectedExecutionException if thrown by Executor 374 */ 375 public int submit(T item) { 376 if (item == null) throw new NullPointerException(); 377 int lag = 0; 378 boolean complete; 379 synchronized (this) { 380 complete = closed; 381 BufferedSubscription<T> b = clients; 382 if (!complete) { 383 BufferedSubscription<T> pred = null, r = null, rtail = null; 384 while (b != null) { 385 BufferedSubscription<T> next = b.next; 386 int stat = b.offer(item); 387 if (stat < 0) { // disabled 388 b.next = null; 389 if (pred == null) 390 clients = next; 391 else 392 pred.next = next; 393 } 394 else { 395 if (stat > lag) 396 lag = stat; 397 else if (stat == 0) { // place on retry list 398 b.nextRetry = null; 399 if (rtail == null) 400 r = b; 401 else 402 rtail.nextRetry = b; 403 rtail = b; 404 } 405 pred = b; 406 } 407 b = next; 408 } 409 while (r != null) { 410 BufferedSubscription<T> nextRetry = r.nextRetry; 411 r.nextRetry = null; 412 int stat = r.submit(item); 413 if (stat > lag) 414 lag = stat; 415 else if (stat < 0 && clients == r) 416 clients = r.next; // postpone internal unsubscribes 417 r = nextRetry; 418 } 419 } 420 } 421 if (complete) 422 throw new IllegalStateException("Closed"); 423 else 424 return lag; 425 } 426 427 /** 428 * Publishes the given item, if possible, to each current subscriber 429 * by asynchronously invoking its {@link 430 * Flow.Subscriber#onNext(Object) onNext} method. The item may be 431 * dropped by one or more subscribers if resource limits are 432 * exceeded, in which case the given handler (if non-null) is 433 * invoked, and if it returns true, retried once. Other calls to 434 * methods in this class by other threads are blocked while the 435 * handler is invoked. Unless recovery is assured, options are 436 * usually limited to logging the error and/or issuing an {@link 437 * Flow.Subscriber#onError(Throwable) onError} signal to the 438 * subscriber. 439 * 440 * <p>This method returns a status indicator: If negative, it 441 * represents the (negative) number of drops (failed attempts to 442 * issue the item to a subscriber). Otherwise it is an estimate of 443 * the maximum lag (number of items submitted but not yet 444 * consumed) among all current subscribers. This value is at least 445 * one (accounting for this submitted item) if there are any 446 * subscribers, else zero. 447 * 448 * <p>If the Executor for this publisher throws a 449 * RejectedExecutionException (or any other RuntimeException or 450 * Error) when attempting to asynchronously notify subscribers, or 451 * the drop handler throws an exception when processing a dropped 452 * item, then this exception is rethrown. 453 * 454 * @param item the (non-null) item to publish 455 * @param onDrop if non-null, the handler invoked upon a drop to a 456 * subscriber, with arguments of the subscriber and item; if it 457 * returns true, an offer is re-attempted (once) 458 * @return if negative, the (negative) number of drops; otherwise 459 * an estimate of maximum lag 460 * @throws IllegalStateException if closed 461 * @throws NullPointerException if item is null 462 * @throws RejectedExecutionException if thrown by Executor 463 */ 464 public int offer(T item, 465 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) { 466 return doOffer(0L, item, onDrop); 467 } 468 469 /** 470 * Publishes the given item, if possible, to each current subscriber 471 * by asynchronously invoking its {@link 472 * Flow.Subscriber#onNext(Object) onNext} method, blocking while 473 * resources for any subscription are unavailable, up to the 474 * specified timeout or until the caller thread is interrupted, at 475 * which point the given handler (if non-null) is invoked, and if it 476 * returns true, retried once. (The drop handler may distinguish 477 * timeouts from interrupts by checking whether the current thread 478 * is interrupted.) Other calls to methods in this class by other 479 * threads are blocked while the handler is invoked. Unless 480 * recovery is assured, options are usually limited to logging the 481 * error and/or issuing an {@link Flow.Subscriber#onError(Throwable) 482 * onError} signal to the subscriber. 483 * 484 * <p>This method returns a status indicator: If negative, it 485 * represents the (negative) number of drops (failed attempts to 486 * issue the item to a subscriber). Otherwise it is an estimate of 487 * the maximum lag (number of items submitted but not yet 488 * consumed) among all current subscribers. This value is at least 489 * one (accounting for this submitted item) if there are any 490 * subscribers, else zero. 491 * 492 * <p>If the Executor for this publisher throws a 493 * RejectedExecutionException (or any other RuntimeException or 494 * Error) when attempting to asynchronously notify subscribers, or 495 * the drop handler throws an exception when processing a dropped 496 * item, then this exception is rethrown. 497 * 498 * @param item the (non-null) item to publish 499 * @param timeout how long to wait for resources for any subscriber 500 * before giving up, in units of {@code unit} 501 * @param unit a {@code TimeUnit} determining how to interpret the 502 * {@code timeout} parameter 503 * @param onDrop if non-null, the handler invoked upon a drop to a 504 * subscriber, with arguments of the subscriber and item; if it 505 * returns true, an offer is re-attempted (once) 506 * @return if negative, the (negative) number of drops; otherwise 507 * an estimate of maximum lag 508 * @throws IllegalStateException if closed 509 * @throws NullPointerException if item is null 510 * @throws RejectedExecutionException if thrown by Executor 511 */ 512 public int offer(T item, long timeout, TimeUnit unit, 513 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) { 514 return doOffer(unit.toNanos(timeout), item, onDrop); 515 } 516 517 /** Common implementation for both forms of offer */ 518 final int doOffer(long nanos, T item, 519 BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) { 520 if (item == null) throw new NullPointerException(); 521 int lag = 0, drops = 0; 522 boolean complete; 523 synchronized (this) { 524 complete = closed; 525 BufferedSubscription<T> b = clients; 526 if (!complete) { 527 BufferedSubscription<T> pred = null, r = null, rtail = null; 528 while (b != null) { 529 BufferedSubscription<T> next = b.next; 530 int stat = b.offer(item); 531 if (stat < 0) { 532 b.next = null; 533 if (pred == null) 534 clients = next; 535 else 536 pred.next = next; 537 } 538 else { 539 if (stat > lag) 540 lag = stat; 541 else if (stat == 0) { 542 b.nextRetry = null; 543 if (rtail == null) 544 r = b; 545 else 546 rtail.nextRetry = b; 547 rtail = b; 548 } 549 else if (stat > lag) 550 lag = stat; 551 pred = b; 552 } 553 b = next; 554 } 555 while (r != null) { 556 BufferedSubscription<T> nextRetry = r.nextRetry; 557 r.nextRetry = null; 558 int stat = (nanos > 0L) 559 ? r.timedOffer(item, nanos) 560 : r.offer(item); 561 if (stat == 0 && onDrop != null && 562 onDrop.test(r.subscriber, item)) 563 stat = r.offer(item); 564 if (stat == 0) 565 ++drops; 566 else if (stat > lag) 567 lag = stat; 568 else if (stat < 0 && clients == r) 569 clients = r.next; 570 r = nextRetry; 571 } 572 } 573 } 574 if (complete) 575 throw new IllegalStateException("Closed"); 576 else 577 return (drops > 0) ? -drops : lag; 578 } 579 580 /** 581 * Unless already closed, issues {@link 582 * Flow.Subscriber#onComplete() onComplete} signals to current 583 * subscribers, and disallows subsequent attempts to publish. 584 * Upon return, this method does <em>NOT</em> guarantee that all 585 * subscribers have yet completed. 586 */ 587 public void close() { 588 if (!closed) { 589 BufferedSubscription<T> b; 590 synchronized (this) { 591 // no need to re-check closed here 592 b = clients; 593 clients = null; 594 closed = true; 595 } 596 while (b != null) { 597 BufferedSubscription<T> next = b.next; 598 b.next = null; 599 b.onComplete(); 600 b = next; 601 } 602 } 603 } 604 605 /** 606 * Unless already closed, issues {@link 607 * Flow.Subscriber#onError(Throwable) onError} signals to current 608 * subscribers with the given error, and disallows subsequent 609 * attempts to publish. Future subscribers also receive the given 610 * error. Upon return, this method does <em>NOT</em> guarantee 611 * that all subscribers have yet completed. 612 * 613 * @param error the {@code onError} argument sent to subscribers 614 * @throws NullPointerException if error is null 615 */ 616 public void closeExceptionally(Throwable error) { 617 if (error == null) 618 throw new NullPointerException(); 619 if (!closed) { 620 BufferedSubscription<T> b; 621 synchronized (this) { 622 b = clients; 623 if (!closed) { // don't clobber racing close 624 clients = null; 625 closedException = error; 626 closed = true; 627 } 628 } 629 while (b != null) { 630 BufferedSubscription<T> next = b.next; 631 b.next = null; 632 b.onError(error); 633 b = next; 634 } 635 } 636 } 637 638 /** 639 * Returns true if this publisher is not accepting submissions. 640 * 641 * @return true if closed 642 */ 643 public boolean isClosed() { 644 return closed; 645 } 646 647 /** 648 * Returns the exception associated with {@link 649 * #closeExceptionally(Throwable) closeExceptionally}, or null if 650 * not closed or if closed normally. 651 * 652 * @return the exception, or null if none 653 */ 654 public Throwable getClosedException() { 655 return closedException; 656 } 657 658 /** 659 * Returns true if this publisher has any subscribers. 660 * 661 * @return true if this publisher has any subscribers 662 */ 663 public boolean hasSubscribers() { 664 boolean nonEmpty = false; 665 if (!closed) { 666 synchronized (this) { 667 for (BufferedSubscription<T> b = clients; b != null;) { 668 BufferedSubscription<T> next = b.next; 669 if (b.isDisabled()) { 670 b.next = null; 671 b = clients = next; 672 } 673 else { 674 nonEmpty = true; 675 break; 676 } 677 } 678 } 679 } 680 return nonEmpty; 681 } 682 683 /** 684 * Returns the number of current subscribers. 685 * 686 * @return the number of current subscribers 687 */ 688 public int getNumberOfSubscribers() { 689 int count = 0; 690 if (!closed) { 691 synchronized (this) { 692 BufferedSubscription<T> pred = null, next; 693 for (BufferedSubscription<T> b = clients; b != null; b = next) { 694 next = b.next; 695 if (b.isDisabled()) { 696 b.next = null; 697 if (pred == null) 698 clients = next; 699 else 700 pred.next = next; 701 } 702 else { 703 pred = b; 704 ++count; 705 } 706 } 707 } 708 } 709 return count; 710 } 711 712 /** 713 * Returns the Executor used for asynchronous delivery. 714 * 715 * @return the Executor used for asynchronous delivery 716 */ 717 public Executor getExecutor() { 718 return executor; 719 } 720 721 /** 722 * Returns the maximum per-subscriber buffer capacity. 723 * 724 * @return the maximum per-subscriber buffer capacity 725 */ 726 public int getMaxBufferCapacity() { 727 return maxBufferCapacity; 728 } 729 730 /** 731 * Returns a list of current subscribers for monitoring and 732 * tracking purposes, not for invoking {@link Flow.Subscriber} 733 * methods on the subscribers. 734 * 735 * @return list of current subscribers 736 */ 737 public List<Flow.Subscriber<? super T>> getSubscribers() { 738 ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>(); 739 synchronized (this) { 740 BufferedSubscription<T> pred = null, next; 741 for (BufferedSubscription<T> b = clients; b != null; b = next) { 742 next = b.next; 743 if (b.isDisabled()) { 744 b.next = null; 745 if (pred == null) 746 clients = next; 747 else 748 pred.next = next; 749 } 750 else 751 subs.add(b.subscriber); 752 } 753 } 754 return subs; 755 } 756 757 /** 758 * Returns true if the given Subscriber is currently subscribed. 759 * 760 * @param subscriber the subscriber 761 * @return true if currently subscribed 762 * @throws NullPointerException if subscriber is null 763 */ 764 public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) { 765 if (subscriber == null) throw new NullPointerException(); 766 if (!closed) { 767 synchronized (this) { 768 BufferedSubscription<T> pred = null, next; 769 for (BufferedSubscription<T> b = clients; b != null; b = next) { 770 next = b.next; 771 if (b.isDisabled()) { 772 b.next = null; 773 if (pred == null) 774 clients = next; 775 else 776 pred.next = next; 777 } 778 else if (subscriber.equals(b.subscriber)) 779 return true; 780 else 781 pred = b; 782 } 783 } 784 } 785 return false; 786 } 787 788 /** 789 * Returns an estimate of the minimum number of items requested 790 * (via {@link Flow.Subscription#request(long) request}) but not 791 * yet produced, among all current subscribers. 792 * 793 * @return the estimate, or zero if no subscribers 794 */ 795 public long estimateMinimumDemand() { 796 long min = Long.MAX_VALUE; 797 boolean nonEmpty = false; 798 synchronized (this) { 799 BufferedSubscription<T> pred = null, next; 800 for (BufferedSubscription<T> b = clients; b != null; b = next) { 801 int n; long d; 802 next = b.next; 803 if ((n = b.estimateLag()) < 0) { 804 b.next = null; 805 if (pred == null) 806 clients = next; 807 else 808 pred.next = next; 809 } 810 else { 811 if ((d = b.demand - n) < min) 812 min = d; 813 nonEmpty = true; 814 pred = b; 815 } 816 } 817 } 818 return nonEmpty ? min : 0; 819 } 820 821 /** 822 * Returns an estimate of the maximum number of items produced but 823 * not yet consumed among all current subscribers. 824 * 825 * @return the estimate 826 */ 827 public int estimateMaximumLag() { 828 int max = 0; 829 synchronized (this) { 830 BufferedSubscription<T> pred = null, next; 831 for (BufferedSubscription<T> b = clients; b != null; b = next) { 832 int n; 833 next = b.next; 834 if ((n = b.estimateLag()) < 0) { 835 b.next = null; 836 if (pred == null) 837 clients = next; 838 else 839 pred.next = next; 840 } 841 else { 842 if (n > max) 843 max = n; 844 pred = b; 845 } 846 } 847 } 848 return max; 849 } 850 851 /** 852 * Processes all published items using the given Consumer function. 853 * Returns a CompletableFuture that is completed normally when this 854 * publisher signals {@link Flow.Subscriber#onComplete() 855 * onComplete}, or completed exceptionally upon any error, or an 856 * exception is thrown by the Consumer, or the returned 857 * CompletableFuture is cancelled, in which case no further items 858 * are processed. 859 * 860 * @param consumer the function applied to each onNext item 861 * @return a CompletableFuture that is completed normally 862 * when the publisher signals onComplete, and exceptionally 863 * upon any error or cancellation 864 * @throws NullPointerException if consumer is null 865 */ 866 public CompletableFuture<Void> consume(Consumer<? super T> consumer) { 867 if (consumer == null) 868 throw new NullPointerException(); 869 CompletableFuture<Void> status = new CompletableFuture<>(); 870 subscribe(new ConsumerSubscriber<T>(status, consumer)); 871 return status; 872 } 873 874 /** Subscriber for method consume */ 875 private static final class ConsumerSubscriber<T> 876 implements Flow.Subscriber<T> { 877 final CompletableFuture<Void> status; 878 final Consumer<? super T> consumer; 879 Flow.Subscription subscription; 880 ConsumerSubscriber(CompletableFuture<Void> status, 881 Consumer<? super T> consumer) { 882 this.status = status; this.consumer = consumer; 883 } 884 public final void onSubscribe(Flow.Subscription subscription) { 885 this.subscription = subscription; 886 status.whenComplete((v, e) -> subscription.cancel()); 887 if (!status.isDone()) 888 subscription.request(Long.MAX_VALUE); 889 } 890 public final void onError(Throwable ex) { 891 status.completeExceptionally(ex); 892 } 893 public final void onComplete() { 894 status.complete(null); 895 } 896 public final void onNext(T item) { 897 try { 898 consumer.accept(item); 899 } catch (Throwable ex) { 900 subscription.cancel(); 901 status.completeExceptionally(ex); 902 } 903 } 904 } 905 906 /** 907 * A task for consuming buffer items and signals, created and 908 * executed whenever they become available. A task consumes as 909 * many items/signals as possible before terminating, at which 910 * point another task is created when needed. The dual Runnable 911 * and ForkJoinTask declaration saves overhead when executed by 912 * ForkJoinPools, without impacting other kinds of Executors. 913 */ 914 @SuppressWarnings("serial") 915 static final class ConsumerTask<T> extends ForkJoinTask<Void> 916 implements Runnable, CompletableFuture.AsynchronousCompletionTask { 917 final BufferedSubscription<T> consumer; 918 ConsumerTask(BufferedSubscription<T> consumer) { 919 this.consumer = consumer; 920 } 921 public final Void getRawResult() { return null; } 922 public final void setRawResult(Void v) {} 923 public final boolean exec() { consumer.consume(); return false; } 924 public final void run() { consumer.consume(); } 925 } 926 927 /** 928 * A bounded (ring) buffer with integrated control to start a 929 * consumer task whenever items are available. The buffer 930 * algorithm is similar to one used inside ForkJoinPool (see its 931 * internal documentation for details) specialized for the case of 932 * at most one concurrent producer and consumer, and power of two 933 * buffer sizes. This allows methods to operate without locks even 934 * while supporting resizing, blocking, task-triggering, and 935 * garbage-free buffers (nulling out elements when consumed), 936 * although supporting these does impose a bit of overhead 937 * compared to plain fixed-size ring buffers. 938 * 939 * The publisher guarantees a single producer via its lock. We 940 * ensure in this class that there is at most one consumer. The 941 * request and cancel methods must be fully thread-safe but are 942 * coded to exploit the most common case in which they are only 943 * called by consumers (usually within onNext). 944 * 945 * Execution control is managed using the ACTIVE ctl bit. We 946 * ensure that a task is active when consumable items (and 947 * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and 948 * there is demand (unfilled requests). This is complicated on 949 * the creation side by the possibility of exceptions when trying 950 * to execute tasks. These eventually force DISABLED state, but 951 * sometimes not directly. On the task side, termination (clearing 952 * ACTIVE) that would otherwise race with producers or request() 953 * calls uses the CONSUME keep-alive bit to force a recheck. 954 * 955 * The ctl field also manages run state. When DISABLED, no further 956 * updates are possible. Disabling may be preceded by setting 957 * ERROR or COMPLETE (or both -- ERROR has precedence), in which 958 * case the associated Subscriber methods are invoked, possibly 959 * synchronously if there is no active consumer task (including 960 * cases where execute() failed). The cancel() method is supported 961 * by treating as ERROR but suppressing onError signal. 962 * 963 * Support for blocking also exploits the fact that there is only 964 * one possible waiter. ManagedBlocker-compatible control fields 965 * are placed in this class itself rather than in wait-nodes. 966 * Blocking control relies on the "waiter" field. Producers set 967 * the field before trying to block, but must then recheck (via 968 * offer) before parking. Signalling then just unparks and clears 969 * waiter field. If the producer and/or consumer are using a 970 * ForkJoinPool, the producer attempts to help run consumer tasks 971 * via ForkJoinPool.helpAsyncBlocker before blocking. 972 * 973 * This class uses @Contended and heuristic field declaration 974 * ordering to reduce false-sharing-based memory contention among 975 * instances of BufferedSubscription, but it does not currently 976 * attempt to avoid memory contention among buffers. This field 977 * and element packing can hurt performance especially when each 978 * publisher has only one client operating at a high rate. 979 * Addressing this may require allocating substantially more space 980 * than users expect. 981 */ 982 @SuppressWarnings("serial") 983 @jdk.internal.vm.annotation.Contended 984 private static final class BufferedSubscription<T> 985 implements Flow.Subscription, ForkJoinPool.ManagedBlocker { 986 // Order-sensitive field declarations 987 long timeout; // > 0 if timed wait 988 volatile long demand; // # unfilled requests 989 int maxCapacity; // reduced on OOME 990 int putStat; // offer result for ManagedBlocker 991 volatile int ctl; // atomic run state flags 992 volatile int head; // next position to take 993 int tail; // next position to put 994 Object[] array; // buffer: null if disabled 995 Flow.Subscriber<? super T> subscriber; // null if disabled 996 Executor executor; // null if disabled 997 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler; 998 volatile Throwable pendingError; // holds until onError issued 999 volatile Thread waiter; // blocked producer thread 1000 T putItem; // for offer within ManagedBlocker 1001 BufferedSubscription<T> next; // used only by publisher 1002 BufferedSubscription<T> nextRetry; // used only by publisher 1003 1004 // ctl values 1005 static final int ACTIVE = 0x01; // consumer task active 1006 static final int CONSUME = 0x02; // keep-alive for consumer task 1007 static final int DISABLED = 0x04; // final state 1008 static final int ERROR = 0x08; // signal onError then disable 1009 static final int SUBSCRIBE = 0x10; // signal onSubscribe 1010 static final int COMPLETE = 0x20; // signal onComplete when done 1011 1012 static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel 1013 1014 /** 1015 * Initial buffer capacity used when maxBufferCapacity is 1016 * greater. Must be a power of two. 1017 */ 1018 static final int DEFAULT_INITIAL_CAP = 32; 1019 1020 BufferedSubscription(Flow.Subscriber<? super T> subscriber, 1021 Executor executor, 1022 BiConsumer<? super Flow.Subscriber<? super T>, 1023 ? super Throwable> onNextHandler, 1024 int maxBufferCapacity) { 1025 this.subscriber = subscriber; 1026 this.executor = executor; 1027 this.onNextHandler = onNextHandler; 1028 this.maxCapacity = maxBufferCapacity; 1029 this.array = new Object[maxBufferCapacity < DEFAULT_INITIAL_CAP ? 1030 (maxBufferCapacity < 2 ? // at least 2 slots 1031 2 : maxBufferCapacity) : 1032 DEFAULT_INITIAL_CAP]; 1033 } 1034 1035 final boolean isDisabled() { 1036 return ctl == DISABLED; 1037 } 1038 1039 /** 1040 * Returns estimated number of buffered items, or -1 if 1041 * disabled. 1042 */ 1043 final int estimateLag() { 1044 int n; 1045 return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0; 1046 } 1047 1048 /** 1049 * Tries to add item and start consumer task if necessary. 1050 * @return -1 if disabled, 0 if dropped, else estimated lag 1051 */ 1052 final int offer(T item) { 1053 int h = head, t = tail, cap, size, stat; 1054 Object[] a = array; 1055 if (a != null && (cap = a.length) > 0 && cap >= (size = t + 1 - h)) { 1056 a[(cap - 1) & t] = item; // relaxed writes OK 1057 tail = t + 1; 1058 stat = size; 1059 } 1060 else 1061 stat = growAndAdd(a, item); 1062 return (stat > 0 && 1063 (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ? 1064 startOnOffer(stat) : stat; 1065 } 1066 1067 /** 1068 * Tries to create or expand buffer, then adds item if possible. 1069 */ 1070 private int growAndAdd(Object[] a, T item) { 1071 boolean alloc; 1072 int cap, stat; 1073 if ((ctl & (ERROR | DISABLED)) != 0) { 1074 cap = 0; 1075 stat = -1; 1076 alloc = false; 1077 } 1078 else if (a == null || (cap = a.length) <= 0) { 1079 cap = 0; 1080 stat = 1; 1081 alloc = true; 1082 } 1083 else { 1084 VarHandle.fullFence(); // recheck 1085 int h = head, t = tail, size = t + 1 - h; 1086 if (cap >= size) { 1087 a[(cap - 1) & t] = item; 1088 tail = t + 1; 1089 stat = size; 1090 alloc = false; 1091 } 1092 else if (cap >= maxCapacity) { 1093 stat = 0; // cannot grow 1094 alloc = false; 1095 } 1096 else { 1097 stat = cap + 1; 1098 alloc = true; 1099 } 1100 } 1101 if (alloc) { 1102 int newCap = (cap > 0) ? cap << 1 : 1; 1103 if (newCap <= cap) 1104 stat = 0; 1105 else { 1106 Object[] newArray = null; 1107 try { 1108 newArray = new Object[newCap]; 1109 } catch (Throwable ex) { // try to cope with OOME 1110 } 1111 if (newArray == null) { 1112 if (cap > 0) 1113 maxCapacity = cap; // avoid continuous failure 1114 stat = 0; 1115 } 1116 else { 1117 array = newArray; 1118 int t = tail; 1119 int newMask = newCap - 1; 1120 if (a != null && cap > 0) { 1121 int mask = cap - 1; 1122 for (int j = head; j != t; ++j) { 1123 int k = j & mask; 1124 Object x = QA.getAcquire(a, k); 1125 if (x != null && // races with consumer 1126 QA.compareAndSet(a, k, x, null)) 1127 newArray[j & newMask] = x; 1128 } 1129 } 1130 newArray[t & newMask] = item; 1131 tail = t + 1; 1132 } 1133 } 1134 } 1135 return stat; 1136 } 1137 1138 /** 1139 * Spins/helps/blocks while offer returns 0. Called only if 1140 * initial offer return 0. 1141 */ 1142 final int submit(T item) { 1143 int stat; 1144 if ((stat = offer(item)) == 0) { 1145 putItem = item; 1146 timeout = 0L; 1147 putStat = 0; 1148 ForkJoinPool.helpAsyncBlocker(executor, this); 1149 if ((stat = putStat) == 0) { 1150 try { 1151 ForkJoinPool.managedBlock(this); 1152 } catch (InterruptedException ie) { 1153 timeout = INTERRUPTED; 1154 } 1155 stat = putStat; 1156 } 1157 if (timeout < 0L) 1158 Thread.currentThread().interrupt(); 1159 } 1160 return stat; 1161 } 1162 1163 /** 1164 * Timeout version; similar to submit. 1165 */ 1166 final int timedOffer(T item, long nanos) { 1167 int stat; 1168 if ((stat = offer(item)) == 0 && (timeout = nanos) > 0L) { 1169 putItem = item; 1170 putStat = 0; 1171 ForkJoinPool.helpAsyncBlocker(executor, this); 1172 if ((stat = putStat) == 0) { 1173 try { 1174 ForkJoinPool.managedBlock(this); 1175 } catch (InterruptedException ie) { 1176 timeout = INTERRUPTED; 1177 } 1178 stat = putStat; 1179 } 1180 if (timeout < 0L) 1181 Thread.currentThread().interrupt(); 1182 } 1183 return stat; 1184 } 1185 1186 /** 1187 * Tries to start consumer task after offer. 1188 * @return -1 if now disabled, else argument 1189 */ 1190 private int startOnOffer(int stat) { 1191 for (;;) { 1192 Executor e; int c; 1193 if ((c = ctl) == DISABLED || (e = executor) == null) { 1194 stat = -1; 1195 break; 1196 } 1197 else if ((c & ACTIVE) != 0) { // ensure keep-alive 1198 if ((c & CONSUME) != 0 || 1199 CTL.compareAndSet(this, c, c | CONSUME)) 1200 break; 1201 } 1202 else if (demand == 0L || tail == head) 1203 break; 1204 else if (CTL.compareAndSet(this, c, c | (ACTIVE | CONSUME))) { 1205 try { 1206 e.execute(new ConsumerTask<T>(this)); 1207 break; 1208 } catch (RuntimeException | Error ex) { // back out 1209 do {} while (((c = ctl) & DISABLED) == 0 && 1210 (c & ACTIVE) != 0 && 1211 !CTL.weakCompareAndSet 1212 (this, c, c & ~ACTIVE)); 1213 throw ex; 1214 } 1215 } 1216 } 1217 return stat; 1218 } 1219 1220 private void signalWaiter(Thread w) { 1221 waiter = null; 1222 LockSupport.unpark(w); // release producer 1223 } 1224 1225 /** 1226 * Nulls out most fields, mainly to avoid garbage retention 1227 * until publisher unsubscribes, but also to help cleanly stop 1228 * upon error by nulling required components. 1229 */ 1230 private void detach() { 1231 Thread w = waiter; 1232 executor = null; 1233 subscriber = null; 1234 pendingError = null; 1235 signalWaiter(w); 1236 } 1237 1238 /** 1239 * Issues error signal, asynchronously if a task is running, 1240 * else synchronously. 1241 */ 1242 final void onError(Throwable ex) { 1243 for (int c;;) { 1244 if (((c = ctl) & (ERROR | DISABLED)) != 0) 1245 break; 1246 else if ((c & ACTIVE) != 0) { 1247 pendingError = ex; 1248 if (CTL.compareAndSet(this, c, c | ERROR)) 1249 break; // cause consumer task to exit 1250 } 1251 else if (CTL.compareAndSet(this, c, DISABLED)) { 1252 Flow.Subscriber<? super T> s = subscriber; 1253 if (s != null && ex != null) { 1254 try { 1255 s.onError(ex); 1256 } catch (Throwable ignore) { 1257 } 1258 } 1259 detach(); 1260 break; 1261 } 1262 } 1263 } 1264 1265 /** 1266 * Tries to start consumer task upon a signal or request; 1267 * disables on failure. 1268 */ 1269 private void startOrDisable() { 1270 Executor e; 1271 if ((e = executor) != null) { // skip if already disabled 1272 try { 1273 e.execute(new ConsumerTask<T>(this)); 1274 } catch (Throwable ex) { // back out and force signal 1275 for (int c;;) { 1276 if ((c = ctl) == DISABLED || (c & ACTIVE) == 0) 1277 break; 1278 if (CTL.compareAndSet(this, c, c & ~ACTIVE)) { 1279 onError(ex); 1280 break; 1281 } 1282 } 1283 } 1284 } 1285 } 1286 1287 final void onComplete() { 1288 for (int c;;) { 1289 if ((c = ctl) == DISABLED) 1290 break; 1291 if (CTL.compareAndSet(this, c, 1292 c | (ACTIVE | CONSUME | COMPLETE))) { 1293 if ((c & ACTIVE) == 0) 1294 startOrDisable(); 1295 break; 1296 } 1297 } 1298 } 1299 1300 final void onSubscribe() { 1301 for (int c;;) { 1302 if ((c = ctl) == DISABLED) 1303 break; 1304 if (CTL.compareAndSet(this, c, 1305 c | (ACTIVE | CONSUME | SUBSCRIBE))) { 1306 if ((c & ACTIVE) == 0) 1307 startOrDisable(); 1308 break; 1309 } 1310 } 1311 } 1312 1313 /** 1314 * Causes consumer task to exit if active (without reporting 1315 * onError unless there is already a pending error), and 1316 * disables. 1317 */ 1318 public void cancel() { 1319 for (int c;;) { 1320 if ((c = ctl) == DISABLED) 1321 break; 1322 else if ((c & ACTIVE) != 0) { 1323 if (CTL.compareAndSet(this, c, 1324 c | (CONSUME | ERROR))) 1325 break; 1326 } 1327 else if (CTL.compareAndSet(this, c, DISABLED)) { 1328 detach(); 1329 break; 1330 } 1331 } 1332 } 1333 1334 /** 1335 * Adds to demand and possibly starts task. 1336 */ 1337 public void request(long n) { 1338 if (n > 0L) { 1339 for (;;) { 1340 long prev = demand, d; 1341 if ((d = prev + n) < prev) // saturate 1342 d = Long.MAX_VALUE; 1343 if (DEMAND.compareAndSet(this, prev, d)) { 1344 for (int c, h;;) { 1345 if ((c = ctl) == DISABLED) 1346 break; 1347 else if ((c & ACTIVE) != 0) { 1348 if ((c & CONSUME) != 0 || 1349 CTL.compareAndSet(this, c, c | CONSUME)) 1350 break; 1351 } 1352 else if ((h = head) != tail) { 1353 if (CTL.compareAndSet(this, c, 1354 c | (ACTIVE|CONSUME))) { 1355 startOrDisable(); 1356 break; 1357 } 1358 } 1359 else if (head == h && tail == h) 1360 break; // else stale 1361 if (demand == 0L) 1362 break; 1363 } 1364 break; 1365 } 1366 } 1367 } 1368 else 1369 onError(new IllegalArgumentException( 1370 "non-positive subscription request")); 1371 } 1372 1373 public final boolean isReleasable() { // for ManagedBlocker 1374 T item = putItem; 1375 if (item != null) { 1376 if ((putStat = offer(item)) == 0) 1377 return false; 1378 putItem = null; 1379 } 1380 return true; 1381 } 1382 1383 public final boolean block() { // for ManagedBlocker 1384 T item = putItem; 1385 if (item != null) { 1386 putItem = null; 1387 long nanos = timeout; 1388 long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L; 1389 while ((putStat = offer(item)) == 0) { 1390 if (Thread.interrupted()) { 1391 timeout = INTERRUPTED; 1392 if (nanos > 0L) 1393 break; 1394 } 1395 else if (nanos > 0L && 1396 (nanos = deadline - System.nanoTime()) <= 0L) 1397 break; 1398 else if (waiter == null) 1399 waiter = Thread.currentThread(); 1400 else { 1401 if (nanos > 0L) 1402 LockSupport.parkNanos(this, nanos); 1403 else 1404 LockSupport.park(this); 1405 waiter = null; 1406 } 1407 } 1408 } 1409 waiter = null; 1410 return true; 1411 } 1412 1413 /** 1414 * Consumer loop, called from ConsumerTask, or indirectly 1415 * when helping during submit. 1416 */ 1417 final void consume() { 1418 Flow.Subscriber<? super T> s; 1419 int h = head; 1420 if ((s = subscriber) != null) { // else disabled 1421 for (;;) { 1422 long d = demand; 1423 int c; Object[] a; int n, i; Object x; Thread w; 1424 if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) { 1425 if (!checkControl(s, c)) 1426 break; 1427 } 1428 else if ((a = array) == null || h == tail || 1429 (n = a.length) == 0 || 1430 (x = QA.getAcquire(a, i = (n - 1) & h)) == null) { 1431 if (!checkEmpty(s, c)) 1432 break; 1433 } 1434 else if (d == 0L) { 1435 if (!checkDemand(c)) 1436 break; 1437 } 1438 else if (((c & CONSUME) != 0 || 1439 CTL.compareAndSet(this, c, c | CONSUME)) && 1440 QA.compareAndSet(a, i, x, null)) { 1441 HEAD.setRelease(this, ++h); 1442 DEMAND.getAndAdd(this, -1L); 1443 if ((w = waiter) != null) 1444 signalWaiter(w); 1445 try { 1446 @SuppressWarnings("unchecked") T y = (T) x; 1447 s.onNext(y); 1448 } catch (Throwable ex) { 1449 handleOnNext(s, ex); 1450 } 1451 } 1452 } 1453 } 1454 } 1455 1456 /** 1457 * Responds to control events in consume(). 1458 */ 1459 private boolean checkControl(Flow.Subscriber<? super T> s, int c) { 1460 boolean stat = true; 1461 if ((c & SUBSCRIBE) != 0) { 1462 if (CTL.compareAndSet(this, c, c & ~SUBSCRIBE)) { 1463 try { 1464 if (s != null) 1465 s.onSubscribe(this); 1466 } catch (Throwable ex) { 1467 onError(ex); 1468 } 1469 } 1470 } 1471 else if ((c & ERROR) != 0) { 1472 Throwable ex = pendingError; 1473 ctl = DISABLED; // no need for CAS 1474 if (ex != null) { // null if errorless cancel 1475 try { 1476 if (s != null) 1477 s.onError(ex); 1478 } catch (Throwable ignore) { 1479 } 1480 } 1481 } 1482 else { 1483 detach(); 1484 stat = false; 1485 } 1486 return stat; 1487 } 1488 1489 /** 1490 * Responds to apparent emptiness in consume(). 1491 */ 1492 private boolean checkEmpty(Flow.Subscriber<? super T> s, int c) { 1493 boolean stat = true; 1494 if (head == tail) { 1495 if ((c & CONSUME) != 0) 1496 CTL.compareAndSet(this, c, c & ~CONSUME); 1497 else if ((c & COMPLETE) != 0) { 1498 if (CTL.compareAndSet(this, c, DISABLED)) { 1499 try { 1500 if (s != null) 1501 s.onComplete(); 1502 } catch (Throwable ignore) { 1503 } 1504 } 1505 } 1506 else if (CTL.compareAndSet(this, c, c & ~ACTIVE)) 1507 stat = false; 1508 } 1509 return stat; 1510 } 1511 1512 /** 1513 * Responds to apparent zero demand in consume(). 1514 */ 1515 private boolean checkDemand(int c) { 1516 boolean stat = true; 1517 if (demand == 0L) { 1518 if ((c & CONSUME) != 0) 1519 CTL.compareAndSet(this, c, c & ~CONSUME); 1520 else if (CTL.compareAndSet(this, c, c & ~ACTIVE)) 1521 stat = false; 1522 } 1523 return stat; 1524 } 1525 1526 /** 1527 * Processes exception in Subscriber.onNext. 1528 */ 1529 private void handleOnNext(Flow.Subscriber<? super T> s, Throwable ex) { 1530 BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> h; 1531 if ((h = onNextHandler) != null) { 1532 try { 1533 h.accept(s, ex); 1534 } catch (Throwable ignore) { 1535 } 1536 } 1537 onError(ex); 1538 } 1539 1540 // VarHandle mechanics 1541 private static final VarHandle CTL; 1542 private static final VarHandle TAIL; 1543 private static final VarHandle HEAD; 1544 private static final VarHandle DEMAND; 1545 private static final VarHandle QA; 1546 1547 static { 1548 try { 1549 MethodHandles.Lookup l = MethodHandles.lookup(); 1550 CTL = l.findVarHandle(BufferedSubscription.class, "ctl", 1551 int.class); 1552 TAIL = l.findVarHandle(BufferedSubscription.class, "tail", 1553 int.class); 1554 HEAD = l.findVarHandle(BufferedSubscription.class, "head", 1555 int.class); 1556 DEMAND = l.findVarHandle(BufferedSubscription.class, "demand", 1557 long.class); 1558 QA = MethodHandles.arrayElementVarHandle(Object[].class); 1559 } catch (ReflectiveOperationException e) { 1560 throw new Error(e); 1561 } 1562 1563 // Reduce the risk of rare disastrous classloading in first call to 1564 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 1565 Class<?> ensureLoaded = LockSupport.class; 1566 } 1567 } 1568} 1569