1/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation.  Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25/*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36package java.util.concurrent;
37
38import java.lang.invoke.MethodHandles;
39import java.lang.invoke.VarHandle;
40import java.util.ArrayList;
41import java.util.List;
42import java.util.concurrent.locks.LockSupport;
43import java.util.function.BiConsumer;
44import java.util.function.BiPredicate;
45import java.util.function.Consumer;
46
47/**
48 * A {@link Flow.Publisher} that asynchronously issues submitted
49 * (non-null) items to current subscribers until it is closed.  Each
50 * current subscriber receives newly submitted items in the same order
51 * unless drops or exceptions are encountered.  Using a
52 * SubmissionPublisher allows item generators to act as compliant <a
53 * href="http://www.reactive-streams.org/"> reactive-streams</a>
54 * Publishers relying on drop handling and/or blocking for flow
55 * control.
56 *
57 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
58 * constructor for delivery to subscribers. The best choice of
59 * Executor depends on expected usage. If the generator(s) of
60 * submitted items run in separate threads, and the number of
61 * subscribers can be estimated, consider using a {@link
62 * Executors#newFixedThreadPool}. Otherwise consider using the
63 * default, normally the {@link ForkJoinPool#commonPool}.
64 *
65 * <p>Buffering allows producers and consumers to transiently operate
66 * at different rates.  Each subscriber uses an independent buffer.
67 * Buffers are created upon first use and expanded as needed up to the
68 * given maximum. (The enforced capacity may be rounded up to the
69 * nearest power of two and/or bounded by the largest value supported
70 * by this implementation.)  Invocations of {@link
71 * Flow.Subscription#request(long) request} do not directly result in
72 * buffer expansion, but risk saturation if unfilled requests exceed
73 * the maximum capacity.  The default value of {@link
74 * Flow#defaultBufferSize()} may provide a useful starting point for
75 * choosing a capacity based on expected rates, resources, and usages.
76 *
77 * <p>Publication methods support different policies about what to do
78 * when buffers are saturated. Method {@link #submit(Object) submit}
79 * blocks until resources are available. This is simplest, but least
80 * responsive.  The {@code offer} methods may drop items (either
81 * immediately or with bounded timeout), but provide an opportunity to
82 * interpose a handler and then retry.
83 *
84 * <p>If any Subscriber method throws an exception, its subscription
85 * is cancelled.  If a handler is supplied as a constructor argument,
86 * it is invoked before cancellation upon an exception in method
87 * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
88 * {@link Flow.Subscriber#onSubscribe onSubscribe},
89 * {@link Flow.Subscriber#onError(Throwable) onError} and
90 * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
91 * handled before cancellation.  If the supplied Executor throws
92 * {@link RejectedExecutionException} (or any other RuntimeException
93 * or Error) when attempting to execute a task, or a drop handler
94 * throws an exception when processing a dropped item, then the
95 * exception is rethrown. In these cases, not all subscribers will
96 * have been issued the published item. It is usually good practice to
97 * {@link #closeExceptionally closeExceptionally} in these cases.
98 *
99 * <p>Method {@link #consume(Consumer)} simplifies support for a
100 * common case in which the only action of a subscriber is to request
101 * and process all items using a supplied function.
102 *
103 * <p>This class may also serve as a convenient base for subclasses
104 * that generate items, and use the methods in this class to publish
105 * them.  For example here is a class that periodically publishes the
106 * items generated from a supplier. (In practice you might add methods
107 * to independently start and stop generation, to share Executors
108 * among publishers, and so on, or use a SubmissionPublisher as a
109 * component rather than a superclass.)
110 *
111 * <pre> {@code
112 * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
113 *   final ScheduledFuture<?> periodicTask;
114 *   final ScheduledExecutorService scheduler;
115 *   PeriodicPublisher(Executor executor, int maxBufferCapacity,
116 *                     Supplier<? extends T> supplier,
117 *                     long period, TimeUnit unit) {
118 *     super(executor, maxBufferCapacity);
119 *     scheduler = new ScheduledThreadPoolExecutor(1);
120 *     periodicTask = scheduler.scheduleAtFixedRate(
121 *       () -> submit(supplier.get()), 0, period, unit);
122 *   }
123 *   public void close() {
124 *     periodicTask.cancel(false);
125 *     scheduler.shutdown();
126 *     super.close();
127 *   }
128 * }}</pre>
129 *
130 * <p>Here is an example of a {@link Flow.Processor} implementation.
131 * It uses single-step requests to its publisher for simplicity of
132 * illustration. A more adaptive version could monitor flow using the
133 * lag estimate returned from {@code submit}, along with other utility
134 * methods.
135 *
136 * <pre> {@code
137 * class TransformProcessor<S,T> extends SubmissionPublisher<T>
138 *   implements Flow.Processor<S,T> {
139 *   final Function<? super S, ? extends T> function;
140 *   Flow.Subscription subscription;
141 *   TransformProcessor(Executor executor, int maxBufferCapacity,
142 *                      Function<? super S, ? extends T> function) {
143 *     super(executor, maxBufferCapacity);
144 *     this.function = function;
145 *   }
146 *   public void onSubscribe(Flow.Subscription subscription) {
147 *     (this.subscription = subscription).request(1);
148 *   }
149 *   public void onNext(S item) {
150 *     subscription.request(1);
151 *     submit(function.apply(item));
152 *   }
153 *   public void onError(Throwable ex) { closeExceptionally(ex); }
154 *   public void onComplete() { close(); }
155 * }}</pre>
156 *
157 * @param <T> the published item type
158 * @author Doug Lea
159 * @since 9
160 */
161public class SubmissionPublisher<T> implements Flow.Publisher<T>,
162                                               AutoCloseable {
163    /*
164     * Most mechanics are handled by BufferedSubscription. This class
165     * mainly tracks subscribers and ensures sequentiality, by using
166     * built-in synchronization locks across public methods. (Using
167     * built-in locks works well in the most typical case in which
168     * only one thread submits items).
169     */
170
171    /** The largest possible power of two array size. */
172    static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
173
174    /** Round capacity to power of 2, at most limit. */
175    static final int roundCapacity(int cap) {
176        int n = cap - 1;
177        n |= n >>> 1;
178        n |= n >>> 2;
179        n |= n >>> 4;
180        n |= n >>> 8;
181        n |= n >>> 16;
182        return (n <= 0) ? 1 : // at least 1
183            (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
184    }
185
186    // default Executor setup; nearly the same as CompletableFuture
187
188    /**
189     * Default executor -- ForkJoinPool.commonPool() unless it cannot
190     * support parallelism.
191     */
192    private static final Executor ASYNC_POOL =
193        (ForkJoinPool.getCommonPoolParallelism() > 1) ?
194        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
195
196    /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
197    private static final class ThreadPerTaskExecutor implements Executor {
198        ThreadPerTaskExecutor() {}      // prevent access constructor creation
199        public void execute(Runnable r) { new Thread(r).start(); }
200    }
201
202    /**
203     * Clients (BufferedSubscriptions) are maintained in a linked list
204     * (via their "next" fields). This works well for publish loops.
205     * It requires O(n) traversal to check for duplicate subscribers,
206     * but we expect that subscribing is much less common than
207     * publishing. Unsubscribing occurs only during traversal loops,
208     * when BufferedSubscription methods return negative values
209     * signifying that they have been disabled.  To reduce
210     * head-of-line blocking, submit and offer methods first call
211     * BufferedSubscription.offer on each subscriber, and place
212     * saturated ones in retries list (using nextRetry field), and
213     * retry, possibly blocking or dropping.
214     */
215    BufferedSubscription<T> clients;
216
217    /** Run status, updated only within locks */
218    volatile boolean closed;
219    /** If non-null, the exception in closeExceptionally */
220    volatile Throwable closedException;
221
222    // Parameters for constructing BufferedSubscriptions
223    final Executor executor;
224    final BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
225    final int maxBufferCapacity;
226
227    /**
228     * Creates a new SubmissionPublisher using the given Executor for
229     * async delivery to subscribers, with the given maximum buffer size
230     * for each subscriber, and, if non-null, the given handler invoked
231     * when any Subscriber throws an exception in method {@link
232     * Flow.Subscriber#onNext(Object) onNext}.
233     *
234     * @param executor the executor to use for async delivery,
235     * supporting creation of at least one independent thread
236     * @param maxBufferCapacity the maximum capacity for each
237     * subscriber's buffer (the enforced capacity may be rounded up to
238     * the nearest power of two and/or bounded by the largest value
239     * supported by this implementation; method {@link #getMaxBufferCapacity}
240     * returns the actual value)
241     * @param handler if non-null, procedure to invoke upon exception
242     * thrown in method {@code onNext}
243     * @throws NullPointerException if executor is null
244     * @throws IllegalArgumentException if maxBufferCapacity not
245     * positive
246     */
247    public SubmissionPublisher(Executor executor, int maxBufferCapacity,
248                               BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) {
249        if (executor == null)
250            throw new NullPointerException();
251        if (maxBufferCapacity <= 0)
252            throw new IllegalArgumentException("capacity must be positive");
253        this.executor = executor;
254        this.onNextHandler = handler;
255        this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
256    }
257
258    /**
259     * Creates a new SubmissionPublisher using the given Executor for
260     * async delivery to subscribers, with the given maximum buffer size
261     * for each subscriber, and no handler for Subscriber exceptions in
262     * method {@link Flow.Subscriber#onNext(Object) onNext}.
263     *
264     * @param executor the executor to use for async delivery,
265     * supporting creation of at least one independent thread
266     * @param maxBufferCapacity the maximum capacity for each
267     * subscriber's buffer (the enforced capacity may be rounded up to
268     * the nearest power of two and/or bounded by the largest value
269     * supported by this implementation; method {@link #getMaxBufferCapacity}
270     * returns the actual value)
271     * @throws NullPointerException if executor is null
272     * @throws IllegalArgumentException if maxBufferCapacity not
273     * positive
274     */
275    public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
276        this(executor, maxBufferCapacity, null);
277    }
278
279    /**
280     * Creates a new SubmissionPublisher using the {@link
281     * ForkJoinPool#commonPool()} for async delivery to subscribers
282     * (unless it does not support a parallelism level of at least two,
283     * in which case, a new Thread is created to run each task), with
284     * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
285     * handler for Subscriber exceptions in method {@link
286     * Flow.Subscriber#onNext(Object) onNext}.
287     */
288    public SubmissionPublisher() {
289        this(ASYNC_POOL, Flow.defaultBufferSize(), null);
290    }
291
292    /**
293     * Adds the given Subscriber unless already subscribed.  If already
294     * subscribed, the Subscriber's {@link
295     * Flow.Subscriber#onError(Throwable) onError} method is invoked on
296     * the existing subscription with an {@link IllegalStateException}.
297     * Otherwise, upon success, the Subscriber's {@link
298     * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
299     * asynchronously with a new {@link Flow.Subscription}.  If {@link
300     * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
301     * subscription is cancelled. Otherwise, if this SubmissionPublisher
302     * was closed exceptionally, then the subscriber's {@link
303     * Flow.Subscriber#onError onError} method is invoked with the
304     * corresponding exception, or if closed without exception, the
305     * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
306     * method is invoked.  Subscribers may enable receiving items by
307     * invoking the {@link Flow.Subscription#request(long) request}
308     * method of the new Subscription, and may unsubscribe by invoking
309     * its {@link Flow.Subscription#cancel() cancel} method.
310     *
311     * @param subscriber the subscriber
312     * @throws NullPointerException if subscriber is null
313     */
314    public void subscribe(Flow.Subscriber<? super T> subscriber) {
315        if (subscriber == null) throw new NullPointerException();
316        BufferedSubscription<T> subscription =
317            new BufferedSubscription<T>(subscriber, executor,
318                                        onNextHandler, maxBufferCapacity);
319        synchronized (this) {
320            for (BufferedSubscription<T> b = clients, pred = null;;) {
321                if (b == null) {
322                    Throwable ex;
323                    subscription.onSubscribe();
324                    if ((ex = closedException) != null)
325                        subscription.onError(ex);
326                    else if (closed)
327                        subscription.onComplete();
328                    else if (pred == null)
329                        clients = subscription;
330                    else
331                        pred.next = subscription;
332                    break;
333                }
334                BufferedSubscription<T> next = b.next;
335                if (b.isDisabled()) { // remove
336                    b.next = null;    // detach
337                    if (pred == null)
338                        clients = next;
339                    else
340                        pred.next = next;
341                }
342                else if (subscriber.equals(b.subscriber)) {
343                    b.onError(new IllegalStateException("Duplicate subscribe"));
344                    break;
345                }
346                else
347                    pred = b;
348                b = next;
349            }
350        }
351    }
352
353    /**
354     * Publishes the given item to each current subscriber by
355     * asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
356     * onNext} method, blocking uninterruptibly while resources for any
357     * subscriber are unavailable. This method returns an estimate of
358     * the maximum lag (number of items submitted but not yet consumed)
359     * among all current subscribers. This value is at least one
360     * (accounting for this submitted item) if there are any
361     * subscribers, else zero.
362     *
363     * <p>If the Executor for this publisher throws a
364     * RejectedExecutionException (or any other RuntimeException or
365     * Error) when attempting to asynchronously notify subscribers,
366     * then this exception is rethrown, in which case not all
367     * subscribers will have been issued this item.
368     *
369     * @param item the (non-null) item to publish
370     * @return the estimated maximum lag among subscribers
371     * @throws IllegalStateException if closed
372     * @throws NullPointerException if item is null
373     * @throws RejectedExecutionException if thrown by Executor
374     */
375    public int submit(T item) {
376        if (item == null) throw new NullPointerException();
377        int lag = 0;
378        boolean complete;
379        synchronized (this) {
380            complete = closed;
381            BufferedSubscription<T> b = clients;
382            if (!complete) {
383                BufferedSubscription<T> pred = null, r = null, rtail = null;
384                while (b != null) {
385                    BufferedSubscription<T> next = b.next;
386                    int stat = b.offer(item);
387                    if (stat < 0) {           // disabled
388                        b.next = null;
389                        if (pred == null)
390                            clients = next;
391                        else
392                            pred.next = next;
393                    }
394                    else {
395                        if (stat > lag)
396                            lag = stat;
397                        else if (stat == 0) { // place on retry list
398                            b.nextRetry = null;
399                            if (rtail == null)
400                                r = b;
401                            else
402                                rtail.nextRetry = b;
403                            rtail = b;
404                        }
405                        pred = b;
406                    }
407                    b = next;
408                }
409                while (r != null) {
410                    BufferedSubscription<T> nextRetry = r.nextRetry;
411                    r.nextRetry = null;
412                    int stat = r.submit(item);
413                    if (stat > lag)
414                        lag = stat;
415                    else if (stat < 0 && clients == r)
416                        clients = r.next; // postpone internal unsubscribes
417                    r = nextRetry;
418                }
419            }
420        }
421        if (complete)
422            throw new IllegalStateException("Closed");
423        else
424            return lag;
425    }
426
427    /**
428     * Publishes the given item, if possible, to each current subscriber
429     * by asynchronously invoking its {@link
430     * Flow.Subscriber#onNext(Object) onNext} method. The item may be
431     * dropped by one or more subscribers if resource limits are
432     * exceeded, in which case the given handler (if non-null) is
433     * invoked, and if it returns true, retried once.  Other calls to
434     * methods in this class by other threads are blocked while the
435     * handler is invoked.  Unless recovery is assured, options are
436     * usually limited to logging the error and/or issuing an {@link
437     * Flow.Subscriber#onError(Throwable) onError} signal to the
438     * subscriber.
439     *
440     * <p>This method returns a status indicator: If negative, it
441     * represents the (negative) number of drops (failed attempts to
442     * issue the item to a subscriber). Otherwise it is an estimate of
443     * the maximum lag (number of items submitted but not yet
444     * consumed) among all current subscribers. This value is at least
445     * one (accounting for this submitted item) if there are any
446     * subscribers, else zero.
447     *
448     * <p>If the Executor for this publisher throws a
449     * RejectedExecutionException (or any other RuntimeException or
450     * Error) when attempting to asynchronously notify subscribers, or
451     * the drop handler throws an exception when processing a dropped
452     * item, then this exception is rethrown.
453     *
454     * @param item the (non-null) item to publish
455     * @param onDrop if non-null, the handler invoked upon a drop to a
456     * subscriber, with arguments of the subscriber and item; if it
457     * returns true, an offer is re-attempted (once)
458     * @return if negative, the (negative) number of drops; otherwise
459     * an estimate of maximum lag
460     * @throws IllegalStateException if closed
461     * @throws NullPointerException if item is null
462     * @throws RejectedExecutionException if thrown by Executor
463     */
464    public int offer(T item,
465                     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
466        return doOffer(0L, item, onDrop);
467    }
468
469    /**
470     * Publishes the given item, if possible, to each current subscriber
471     * by asynchronously invoking its {@link
472     * Flow.Subscriber#onNext(Object) onNext} method, blocking while
473     * resources for any subscription are unavailable, up to the
474     * specified timeout or until the caller thread is interrupted, at
475     * which point the given handler (if non-null) is invoked, and if it
476     * returns true, retried once. (The drop handler may distinguish
477     * timeouts from interrupts by checking whether the current thread
478     * is interrupted.)  Other calls to methods in this class by other
479     * threads are blocked while the handler is invoked.  Unless
480     * recovery is assured, options are usually limited to logging the
481     * error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
482     * onError} signal to the subscriber.
483     *
484     * <p>This method returns a status indicator: If negative, it
485     * represents the (negative) number of drops (failed attempts to
486     * issue the item to a subscriber). Otherwise it is an estimate of
487     * the maximum lag (number of items submitted but not yet
488     * consumed) among all current subscribers. This value is at least
489     * one (accounting for this submitted item) if there are any
490     * subscribers, else zero.
491     *
492     * <p>If the Executor for this publisher throws a
493     * RejectedExecutionException (or any other RuntimeException or
494     * Error) when attempting to asynchronously notify subscribers, or
495     * the drop handler throws an exception when processing a dropped
496     * item, then this exception is rethrown.
497     *
498     * @param item the (non-null) item to publish
499     * @param timeout how long to wait for resources for any subscriber
500     * before giving up, in units of {@code unit}
501     * @param unit a {@code TimeUnit} determining how to interpret the
502     * {@code timeout} parameter
503     * @param onDrop if non-null, the handler invoked upon a drop to a
504     * subscriber, with arguments of the subscriber and item; if it
505     * returns true, an offer is re-attempted (once)
506     * @return if negative, the (negative) number of drops; otherwise
507     * an estimate of maximum lag
508     * @throws IllegalStateException if closed
509     * @throws NullPointerException if item is null
510     * @throws RejectedExecutionException if thrown by Executor
511     */
512    public int offer(T item, long timeout, TimeUnit unit,
513                     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
514        return doOffer(unit.toNanos(timeout), item, onDrop);
515    }
516
517    /** Common implementation for both forms of offer */
518    final int doOffer(long nanos, T item,
519                      BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
520        if (item == null) throw new NullPointerException();
521        int lag = 0, drops = 0;
522        boolean complete;
523        synchronized (this) {
524            complete = closed;
525            BufferedSubscription<T> b = clients;
526            if (!complete) {
527                BufferedSubscription<T> pred = null, r = null, rtail = null;
528                while (b != null) {
529                    BufferedSubscription<T> next = b.next;
530                    int stat = b.offer(item);
531                    if (stat < 0) {
532                        b.next = null;
533                        if (pred == null)
534                            clients = next;
535                        else
536                            pred.next = next;
537                    }
538                    else {
539                        if (stat > lag)
540                            lag = stat;
541                        else if (stat == 0) {
542                            b.nextRetry = null;
543                            if (rtail == null)
544                                r = b;
545                            else
546                                rtail.nextRetry = b;
547                            rtail = b;
548                        }
549                        else if (stat > lag)
550                            lag = stat;
551                        pred = b;
552                    }
553                    b = next;
554                }
555                while (r != null) {
556                    BufferedSubscription<T> nextRetry = r.nextRetry;
557                    r.nextRetry = null;
558                    int stat = (nanos > 0L)
559                        ? r.timedOffer(item, nanos)
560                        : r.offer(item);
561                    if (stat == 0 && onDrop != null &&
562                        onDrop.test(r.subscriber, item))
563                        stat = r.offer(item);
564                    if (stat == 0)
565                        ++drops;
566                    else if (stat > lag)
567                        lag = stat;
568                    else if (stat < 0 && clients == r)
569                        clients = r.next;
570                    r = nextRetry;
571                }
572            }
573        }
574        if (complete)
575            throw new IllegalStateException("Closed");
576        else
577            return (drops > 0) ? -drops : lag;
578    }
579
580    /**
581     * Unless already closed, issues {@link
582     * Flow.Subscriber#onComplete() onComplete} signals to current
583     * subscribers, and disallows subsequent attempts to publish.
584     * Upon return, this method does <em>NOT</em> guarantee that all
585     * subscribers have yet completed.
586     */
587    public void close() {
588        if (!closed) {
589            BufferedSubscription<T> b;
590            synchronized (this) {
591                // no need to re-check closed here
592                b = clients;
593                clients = null;
594                closed = true;
595            }
596            while (b != null) {
597                BufferedSubscription<T> next = b.next;
598                b.next = null;
599                b.onComplete();
600                b = next;
601            }
602        }
603    }
604
605    /**
606     * Unless already closed, issues {@link
607     * Flow.Subscriber#onError(Throwable) onError} signals to current
608     * subscribers with the given error, and disallows subsequent
609     * attempts to publish.  Future subscribers also receive the given
610     * error. Upon return, this method does <em>NOT</em> guarantee
611     * that all subscribers have yet completed.
612     *
613     * @param error the {@code onError} argument sent to subscribers
614     * @throws NullPointerException if error is null
615     */
616    public void closeExceptionally(Throwable error) {
617        if (error == null)
618            throw new NullPointerException();
619        if (!closed) {
620            BufferedSubscription<T> b;
621            synchronized (this) {
622                b = clients;
623                if (!closed) {  // don't clobber racing close
624                    clients = null;
625                    closedException = error;
626                    closed = true;
627                }
628            }
629            while (b != null) {
630                BufferedSubscription<T> next = b.next;
631                b.next = null;
632                b.onError(error);
633                b = next;
634            }
635        }
636    }
637
638    /**
639     * Returns true if this publisher is not accepting submissions.
640     *
641     * @return true if closed
642     */
643    public boolean isClosed() {
644        return closed;
645    }
646
647    /**
648     * Returns the exception associated with {@link
649     * #closeExceptionally(Throwable) closeExceptionally}, or null if
650     * not closed or if closed normally.
651     *
652     * @return the exception, or null if none
653     */
654    public Throwable getClosedException() {
655        return closedException;
656    }
657
658    /**
659     * Returns true if this publisher has any subscribers.
660     *
661     * @return true if this publisher has any subscribers
662     */
663    public boolean hasSubscribers() {
664        boolean nonEmpty = false;
665        if (!closed) {
666            synchronized (this) {
667                for (BufferedSubscription<T> b = clients; b != null;) {
668                    BufferedSubscription<T> next = b.next;
669                    if (b.isDisabled()) {
670                        b.next = null;
671                        b = clients = next;
672                    }
673                    else {
674                        nonEmpty = true;
675                        break;
676                    }
677                }
678            }
679        }
680        return nonEmpty;
681    }
682
683    /**
684     * Returns the number of current subscribers.
685     *
686     * @return the number of current subscribers
687     */
688    public int getNumberOfSubscribers() {
689        int count = 0;
690        if (!closed) {
691            synchronized (this) {
692                BufferedSubscription<T> pred = null, next;
693                for (BufferedSubscription<T> b = clients; b != null; b = next) {
694                    next = b.next;
695                    if (b.isDisabled()) {
696                        b.next = null;
697                        if (pred == null)
698                            clients = next;
699                        else
700                            pred.next = next;
701                    }
702                    else {
703                        pred = b;
704                        ++count;
705                    }
706                }
707            }
708        }
709        return count;
710    }
711
712    /**
713     * Returns the Executor used for asynchronous delivery.
714     *
715     * @return the Executor used for asynchronous delivery
716     */
717    public Executor getExecutor() {
718        return executor;
719    }
720
721    /**
722     * Returns the maximum per-subscriber buffer capacity.
723     *
724     * @return the maximum per-subscriber buffer capacity
725     */
726    public int getMaxBufferCapacity() {
727        return maxBufferCapacity;
728    }
729
730    /**
731     * Returns a list of current subscribers for monitoring and
732     * tracking purposes, not for invoking {@link Flow.Subscriber}
733     * methods on the subscribers.
734     *
735     * @return list of current subscribers
736     */
737    public List<Flow.Subscriber<? super T>> getSubscribers() {
738        ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
739        synchronized (this) {
740            BufferedSubscription<T> pred = null, next;
741            for (BufferedSubscription<T> b = clients; b != null; b = next) {
742                next = b.next;
743                if (b.isDisabled()) {
744                    b.next = null;
745                    if (pred == null)
746                        clients = next;
747                    else
748                        pred.next = next;
749                }
750                else
751                    subs.add(b.subscriber);
752            }
753        }
754        return subs;
755    }
756
757    /**
758     * Returns true if the given Subscriber is currently subscribed.
759     *
760     * @param subscriber the subscriber
761     * @return true if currently subscribed
762     * @throws NullPointerException if subscriber is null
763     */
764    public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
765        if (subscriber == null) throw new NullPointerException();
766        if (!closed) {
767            synchronized (this) {
768                BufferedSubscription<T> pred = null, next;
769                for (BufferedSubscription<T> b = clients; b != null; b = next) {
770                    next = b.next;
771                    if (b.isDisabled()) {
772                        b.next = null;
773                        if (pred == null)
774                            clients = next;
775                        else
776                            pred.next = next;
777                    }
778                    else if (subscriber.equals(b.subscriber))
779                        return true;
780                    else
781                        pred = b;
782                }
783            }
784        }
785        return false;
786    }
787
788    /**
789     * Returns an estimate of the minimum number of items requested
790     * (via {@link Flow.Subscription#request(long) request}) but not
791     * yet produced, among all current subscribers.
792     *
793     * @return the estimate, or zero if no subscribers
794     */
795    public long estimateMinimumDemand() {
796        long min = Long.MAX_VALUE;
797        boolean nonEmpty = false;
798        synchronized (this) {
799            BufferedSubscription<T> pred = null, next;
800            for (BufferedSubscription<T> b = clients; b != null; b = next) {
801                int n; long d;
802                next = b.next;
803                if ((n = b.estimateLag()) < 0) {
804                    b.next = null;
805                    if (pred == null)
806                        clients = next;
807                    else
808                        pred.next = next;
809                }
810                else {
811                    if ((d = b.demand - n) < min)
812                        min = d;
813                    nonEmpty = true;
814                    pred = b;
815                }
816            }
817        }
818        return nonEmpty ? min : 0;
819    }
820
821    /**
822     * Returns an estimate of the maximum number of items produced but
823     * not yet consumed among all current subscribers.
824     *
825     * @return the estimate
826     */
827    public int estimateMaximumLag() {
828        int max = 0;
829        synchronized (this) {
830            BufferedSubscription<T> pred = null, next;
831            for (BufferedSubscription<T> b = clients; b != null; b = next) {
832                int n;
833                next = b.next;
834                if ((n = b.estimateLag()) < 0) {
835                    b.next = null;
836                    if (pred == null)
837                        clients = next;
838                    else
839                        pred.next = next;
840                }
841                else {
842                    if (n > max)
843                        max = n;
844                    pred = b;
845                }
846            }
847        }
848        return max;
849    }
850
851    /**
852     * Processes all published items using the given Consumer function.
853     * Returns a CompletableFuture that is completed normally when this
854     * publisher signals {@link Flow.Subscriber#onComplete()
855     * onComplete}, or completed exceptionally upon any error, or an
856     * exception is thrown by the Consumer, or the returned
857     * CompletableFuture is cancelled, in which case no further items
858     * are processed.
859     *
860     * @param consumer the function applied to each onNext item
861     * @return a CompletableFuture that is completed normally
862     * when the publisher signals onComplete, and exceptionally
863     * upon any error or cancellation
864     * @throws NullPointerException if consumer is null
865     */
866    public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
867        if (consumer == null)
868            throw new NullPointerException();
869        CompletableFuture<Void> status = new CompletableFuture<>();
870        subscribe(new ConsumerSubscriber<T>(status, consumer));
871        return status;
872    }
873
874    /** Subscriber for method consume */
875    private static final class ConsumerSubscriber<T>
876        implements Flow.Subscriber<T> {
877        final CompletableFuture<Void> status;
878        final Consumer<? super T> consumer;
879        Flow.Subscription subscription;
880        ConsumerSubscriber(CompletableFuture<Void> status,
881                           Consumer<? super T> consumer) {
882            this.status = status; this.consumer = consumer;
883        }
884        public final void onSubscribe(Flow.Subscription subscription) {
885            this.subscription = subscription;
886            status.whenComplete((v, e) -> subscription.cancel());
887            if (!status.isDone())
888                subscription.request(Long.MAX_VALUE);
889        }
890        public final void onError(Throwable ex) {
891            status.completeExceptionally(ex);
892        }
893        public final void onComplete() {
894            status.complete(null);
895        }
896        public final void onNext(T item) {
897            try {
898                consumer.accept(item);
899            } catch (Throwable ex) {
900                subscription.cancel();
901                status.completeExceptionally(ex);
902            }
903        }
904    }
905
906    /**
907     * A task for consuming buffer items and signals, created and
908     * executed whenever they become available. A task consumes as
909     * many items/signals as possible before terminating, at which
910     * point another task is created when needed. The dual Runnable
911     * and ForkJoinTask declaration saves overhead when executed by
912     * ForkJoinPools, without impacting other kinds of Executors.
913     */
914    @SuppressWarnings("serial")
915    static final class ConsumerTask<T> extends ForkJoinTask<Void>
916        implements Runnable, CompletableFuture.AsynchronousCompletionTask {
917        final BufferedSubscription<T> consumer;
918        ConsumerTask(BufferedSubscription<T> consumer) {
919            this.consumer = consumer;
920        }
921        public final Void getRawResult() { return null; }
922        public final void setRawResult(Void v) {}
923        public final boolean exec() { consumer.consume(); return false; }
924        public final void run() { consumer.consume(); }
925    }
926
927    /**
928     * A bounded (ring) buffer with integrated control to start a
929     * consumer task whenever items are available.  The buffer
930     * algorithm is similar to one used inside ForkJoinPool (see its
931     * internal documentation for details) specialized for the case of
932     * at most one concurrent producer and consumer, and power of two
933     * buffer sizes. This allows methods to operate without locks even
934     * while supporting resizing, blocking, task-triggering, and
935     * garbage-free buffers (nulling out elements when consumed),
936     * although supporting these does impose a bit of overhead
937     * compared to plain fixed-size ring buffers.
938     *
939     * The publisher guarantees a single producer via its lock.  We
940     * ensure in this class that there is at most one consumer.  The
941     * request and cancel methods must be fully thread-safe but are
942     * coded to exploit the most common case in which they are only
943     * called by consumers (usually within onNext).
944     *
945     * Execution control is managed using the ACTIVE ctl bit. We
946     * ensure that a task is active when consumable items (and
947     * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
948     * there is demand (unfilled requests).  This is complicated on
949     * the creation side by the possibility of exceptions when trying
950     * to execute tasks. These eventually force DISABLED state, but
951     * sometimes not directly. On the task side, termination (clearing
952     * ACTIVE) that would otherwise race with producers or request()
953     * calls uses the CONSUME keep-alive bit to force a recheck.
954     *
955     * The ctl field also manages run state. When DISABLED, no further
956     * updates are possible. Disabling may be preceded by setting
957     * ERROR or COMPLETE (or both -- ERROR has precedence), in which
958     * case the associated Subscriber methods are invoked, possibly
959     * synchronously if there is no active consumer task (including
960     * cases where execute() failed). The cancel() method is supported
961     * by treating as ERROR but suppressing onError signal.
962     *
963     * Support for blocking also exploits the fact that there is only
964     * one possible waiter. ManagedBlocker-compatible control fields
965     * are placed in this class itself rather than in wait-nodes.
966     * Blocking control relies on the "waiter" field. Producers set
967     * the field before trying to block, but must then recheck (via
968     * offer) before parking. Signalling then just unparks and clears
969     * waiter field. If the producer and/or consumer are using a
970     * ForkJoinPool, the producer attempts to help run consumer tasks
971     * via ForkJoinPool.helpAsyncBlocker before blocking.
972     *
973     * This class uses @Contended and heuristic field declaration
974     * ordering to reduce false-sharing-based memory contention among
975     * instances of BufferedSubscription, but it does not currently
976     * attempt to avoid memory contention among buffers. This field
977     * and element packing can hurt performance especially when each
978     * publisher has only one client operating at a high rate.
979     * Addressing this may require allocating substantially more space
980     * than users expect.
981     */
982    @SuppressWarnings("serial")
983    @jdk.internal.vm.annotation.Contended
984    private static final class BufferedSubscription<T>
985        implements Flow.Subscription, ForkJoinPool.ManagedBlocker {
986        // Order-sensitive field declarations
987        long timeout;                      // > 0 if timed wait
988        volatile long demand;              // # unfilled requests
989        int maxCapacity;                   // reduced on OOME
990        int putStat;                       // offer result for ManagedBlocker
991        volatile int ctl;                  // atomic run state flags
992        volatile int head;                 // next position to take
993        int tail;                          // next position to put
994        Object[] array;                    // buffer: null if disabled
995        Flow.Subscriber<? super T> subscriber; // null if disabled
996        Executor executor;                 // null if disabled
997        BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
998        volatile Throwable pendingError;   // holds until onError issued
999        volatile Thread waiter;            // blocked producer thread
1000        T putItem;                         // for offer within ManagedBlocker
1001        BufferedSubscription<T> next;      // used only by publisher
1002        BufferedSubscription<T> nextRetry; // used only by publisher
1003
1004        // ctl values
1005        static final int ACTIVE    = 0x01; // consumer task active
1006        static final int CONSUME   = 0x02; // keep-alive for consumer task
1007        static final int DISABLED  = 0x04; // final state
1008        static final int ERROR     = 0x08; // signal onError then disable
1009        static final int SUBSCRIBE = 0x10; // signal onSubscribe
1010        static final int COMPLETE  = 0x20; // signal onComplete when done
1011
1012        static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
1013
1014        /**
1015         * Initial buffer capacity used when maxBufferCapacity is
1016         * greater. Must be a power of two.
1017         */
1018        static final int DEFAULT_INITIAL_CAP = 32;
1019
1020        BufferedSubscription(Flow.Subscriber<? super T> subscriber,
1021                             Executor executor,
1022                             BiConsumer<? super Flow.Subscriber<? super T>,
1023                             ? super Throwable> onNextHandler,
1024                             int maxBufferCapacity) {
1025            this.subscriber = subscriber;
1026            this.executor = executor;
1027            this.onNextHandler = onNextHandler;
1028            this.maxCapacity = maxBufferCapacity;
1029            this.array = new Object[maxBufferCapacity < DEFAULT_INITIAL_CAP ?
1030                                    (maxBufferCapacity < 2 ? // at least 2 slots
1031                                     2 : maxBufferCapacity) :
1032                                    DEFAULT_INITIAL_CAP];
1033        }
1034
1035        final boolean isDisabled() {
1036            return ctl == DISABLED;
1037        }
1038
1039        /**
1040         * Returns estimated number of buffered items, or -1 if
1041         * disabled.
1042         */
1043        final int estimateLag() {
1044            int n;
1045            return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
1046        }
1047
1048        /**
1049         * Tries to add item and start consumer task if necessary.
1050         * @return -1 if disabled, 0 if dropped, else estimated lag
1051         */
1052        final int offer(T item) {
1053            int h = head, t = tail, cap, size, stat;
1054            Object[] a = array;
1055            if (a != null && (cap = a.length) > 0 && cap >= (size = t + 1 - h)) {
1056                a[(cap - 1) & t] = item;    // relaxed writes OK
1057                tail = t + 1;
1058                stat = size;
1059            }
1060            else
1061                stat = growAndAdd(a, item);
1062            return (stat > 0 &&
1063                    (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ?
1064                startOnOffer(stat) : stat;
1065        }
1066
1067        /**
1068         * Tries to create or expand buffer, then adds item if possible.
1069         */
1070        private int growAndAdd(Object[] a, T item) {
1071            boolean alloc;
1072            int cap, stat;
1073            if ((ctl & (ERROR | DISABLED)) != 0) {
1074                cap = 0;
1075                stat = -1;
1076                alloc = false;
1077            }
1078            else if (a == null || (cap = a.length) <= 0) {
1079                cap = 0;
1080                stat = 1;
1081                alloc = true;
1082            }
1083            else {
1084                VarHandle.fullFence();           // recheck
1085                int h = head, t = tail, size = t + 1 - h;
1086                if (cap >= size) {
1087                    a[(cap - 1) & t] = item;
1088                    tail = t + 1;
1089                    stat = size;
1090                    alloc = false;
1091                }
1092                else if (cap >= maxCapacity) {
1093                    stat = 0;                    // cannot grow
1094                    alloc = false;
1095                }
1096                else {
1097                    stat = cap + 1;
1098                    alloc = true;
1099                }
1100            }
1101            if (alloc) {
1102                int newCap = (cap > 0) ? cap << 1 : 1;
1103                if (newCap <= cap)
1104                    stat = 0;
1105                else {
1106                    Object[] newArray = null;
1107                    try {
1108                        newArray = new Object[newCap];
1109                    } catch (Throwable ex) {     // try to cope with OOME
1110                    }
1111                    if (newArray == null) {
1112                        if (cap > 0)
1113                            maxCapacity = cap;   // avoid continuous failure
1114                        stat = 0;
1115                    }
1116                    else {
1117                        array = newArray;
1118                        int t = tail;
1119                        int newMask = newCap - 1;
1120                        if (a != null && cap > 0) {
1121                            int mask = cap - 1;
1122                            for (int j = head; j != t; ++j) {
1123                                int k = j & mask;
1124                                Object x = QA.getAcquire(a, k);
1125                                if (x != null && // races with consumer
1126                                    QA.compareAndSet(a, k, x, null))
1127                                    newArray[j & newMask] = x;
1128                            }
1129                        }
1130                        newArray[t & newMask] = item;
1131                        tail = t + 1;
1132                    }
1133                }
1134            }
1135            return stat;
1136        }
1137
1138        /**
1139         * Spins/helps/blocks while offer returns 0.  Called only if
1140         * initial offer return 0.
1141         */
1142        final int submit(T item) {
1143            int stat;
1144            if ((stat = offer(item)) == 0) {
1145                putItem = item;
1146                timeout = 0L;
1147                putStat = 0;
1148                ForkJoinPool.helpAsyncBlocker(executor, this);
1149                if ((stat = putStat) == 0) {
1150                    try {
1151                        ForkJoinPool.managedBlock(this);
1152                    } catch (InterruptedException ie) {
1153                        timeout = INTERRUPTED;
1154                    }
1155                    stat = putStat;
1156                }
1157                if (timeout < 0L)
1158                    Thread.currentThread().interrupt();
1159            }
1160            return stat;
1161        }
1162
1163        /**
1164         * Timeout version; similar to submit.
1165         */
1166        final int timedOffer(T item, long nanos) {
1167            int stat;
1168            if ((stat = offer(item)) == 0 && (timeout = nanos) > 0L) {
1169                putItem = item;
1170                putStat = 0;
1171                ForkJoinPool.helpAsyncBlocker(executor, this);
1172                if ((stat = putStat) == 0) {
1173                    try {
1174                        ForkJoinPool.managedBlock(this);
1175                    } catch (InterruptedException ie) {
1176                        timeout = INTERRUPTED;
1177                    }
1178                    stat = putStat;
1179                }
1180                if (timeout < 0L)
1181                    Thread.currentThread().interrupt();
1182            }
1183            return stat;
1184        }
1185
1186        /**
1187         * Tries to start consumer task after offer.
1188         * @return -1 if now disabled, else argument
1189         */
1190        private int startOnOffer(int stat) {
1191            for (;;) {
1192                Executor e; int c;
1193                if ((c = ctl) == DISABLED || (e = executor) == null) {
1194                    stat = -1;
1195                    break;
1196                }
1197                else if ((c & ACTIVE) != 0) { // ensure keep-alive
1198                    if ((c & CONSUME) != 0 ||
1199                        CTL.compareAndSet(this, c, c | CONSUME))
1200                        break;
1201                }
1202                else if (demand == 0L || tail == head)
1203                    break;
1204                else if (CTL.compareAndSet(this, c, c | (ACTIVE | CONSUME))) {
1205                    try {
1206                        e.execute(new ConsumerTask<T>(this));
1207                        break;
1208                    } catch (RuntimeException | Error ex) { // back out
1209                        do {} while (((c = ctl) & DISABLED) == 0 &&
1210                                     (c & ACTIVE) != 0 &&
1211                                     !CTL.weakCompareAndSet
1212                                     (this, c, c & ~ACTIVE));
1213                        throw ex;
1214                    }
1215                }
1216            }
1217            return stat;
1218        }
1219
1220        private void signalWaiter(Thread w) {
1221            waiter = null;
1222            LockSupport.unpark(w);    // release producer
1223        }
1224
1225        /**
1226         * Nulls out most fields, mainly to avoid garbage retention
1227         * until publisher unsubscribes, but also to help cleanly stop
1228         * upon error by nulling required components.
1229         */
1230        private void detach() {
1231            Thread w = waiter;
1232            executor = null;
1233            subscriber = null;
1234            pendingError = null;
1235            signalWaiter(w);
1236        }
1237
1238        /**
1239         * Issues error signal, asynchronously if a task is running,
1240         * else synchronously.
1241         */
1242        final void onError(Throwable ex) {
1243            for (int c;;) {
1244                if (((c = ctl) & (ERROR | DISABLED)) != 0)
1245                    break;
1246                else if ((c & ACTIVE) != 0) {
1247                    pendingError = ex;
1248                    if (CTL.compareAndSet(this, c, c | ERROR))
1249                        break; // cause consumer task to exit
1250                }
1251                else if (CTL.compareAndSet(this, c, DISABLED)) {
1252                    Flow.Subscriber<? super T> s = subscriber;
1253                    if (s != null && ex != null) {
1254                        try {
1255                            s.onError(ex);
1256                        } catch (Throwable ignore) {
1257                        }
1258                    }
1259                    detach();
1260                    break;
1261                }
1262            }
1263        }
1264
1265        /**
1266         * Tries to start consumer task upon a signal or request;
1267         * disables on failure.
1268         */
1269        private void startOrDisable() {
1270            Executor e;
1271            if ((e = executor) != null) { // skip if already disabled
1272                try {
1273                    e.execute(new ConsumerTask<T>(this));
1274                } catch (Throwable ex) {  // back out and force signal
1275                    for (int c;;) {
1276                        if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
1277                            break;
1278                        if (CTL.compareAndSet(this, c, c & ~ACTIVE)) {
1279                            onError(ex);
1280                            break;
1281                        }
1282                    }
1283                }
1284            }
1285        }
1286
1287        final void onComplete() {
1288            for (int c;;) {
1289                if ((c = ctl) == DISABLED)
1290                    break;
1291                if (CTL.compareAndSet(this, c,
1292                                      c | (ACTIVE | CONSUME | COMPLETE))) {
1293                    if ((c & ACTIVE) == 0)
1294                        startOrDisable();
1295                    break;
1296                }
1297            }
1298        }
1299
1300        final void onSubscribe() {
1301            for (int c;;) {
1302                if ((c = ctl) == DISABLED)
1303                    break;
1304                if (CTL.compareAndSet(this, c,
1305                                      c | (ACTIVE | CONSUME | SUBSCRIBE))) {
1306                    if ((c & ACTIVE) == 0)
1307                        startOrDisable();
1308                    break;
1309                }
1310            }
1311        }
1312
1313        /**
1314         * Causes consumer task to exit if active (without reporting
1315         * onError unless there is already a pending error), and
1316         * disables.
1317         */
1318        public void cancel() {
1319            for (int c;;) {
1320                if ((c = ctl) == DISABLED)
1321                    break;
1322                else if ((c & ACTIVE) != 0) {
1323                    if (CTL.compareAndSet(this, c,
1324                                          c | (CONSUME | ERROR)))
1325                        break;
1326                }
1327                else if (CTL.compareAndSet(this, c, DISABLED)) {
1328                    detach();
1329                    break;
1330                }
1331            }
1332        }
1333
1334        /**
1335         * Adds to demand and possibly starts task.
1336         */
1337        public void request(long n) {
1338            if (n > 0L) {
1339                for (;;) {
1340                    long prev = demand, d;
1341                    if ((d = prev + n) < prev) // saturate
1342                        d = Long.MAX_VALUE;
1343                    if (DEMAND.compareAndSet(this, prev, d)) {
1344                        for (int c, h;;) {
1345                            if ((c = ctl) == DISABLED)
1346                                break;
1347                            else if ((c & ACTIVE) != 0) {
1348                                if ((c & CONSUME) != 0 ||
1349                                    CTL.compareAndSet(this, c, c | CONSUME))
1350                                    break;
1351                            }
1352                            else if ((h = head) != tail) {
1353                                if (CTL.compareAndSet(this, c,
1354                                                      c | (ACTIVE|CONSUME))) {
1355                                    startOrDisable();
1356                                    break;
1357                                }
1358                            }
1359                            else if (head == h && tail == h)
1360                                break;          // else stale
1361                            if (demand == 0L)
1362                                break;
1363                        }
1364                        break;
1365                    }
1366                }
1367            }
1368            else
1369                onError(new IllegalArgumentException(
1370                            "non-positive subscription request"));
1371        }
1372
1373        public final boolean isReleasable() { // for ManagedBlocker
1374            T item = putItem;
1375            if (item != null) {
1376                if ((putStat = offer(item)) == 0)
1377                    return false;
1378                putItem = null;
1379            }
1380            return true;
1381        }
1382
1383        public final boolean block() { // for ManagedBlocker
1384            T item = putItem;
1385            if (item != null) {
1386                putItem = null;
1387                long nanos = timeout;
1388                long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
1389                while ((putStat = offer(item)) == 0) {
1390                    if (Thread.interrupted()) {
1391                        timeout = INTERRUPTED;
1392                        if (nanos > 0L)
1393                            break;
1394                    }
1395                    else if (nanos > 0L &&
1396                             (nanos = deadline - System.nanoTime()) <= 0L)
1397                        break;
1398                    else if (waiter == null)
1399                        waiter = Thread.currentThread();
1400                    else {
1401                        if (nanos > 0L)
1402                            LockSupport.parkNanos(this, nanos);
1403                        else
1404                            LockSupport.park(this);
1405                        waiter = null;
1406                    }
1407                }
1408            }
1409            waiter = null;
1410            return true;
1411        }
1412
1413        /**
1414         * Consumer loop, called from ConsumerTask, or indirectly
1415         * when helping during submit.
1416         */
1417        final void consume() {
1418            Flow.Subscriber<? super T> s;
1419            int h = head;
1420            if ((s = subscriber) != null) {           // else disabled
1421                for (;;) {
1422                    long d = demand;
1423                    int c; Object[] a; int n, i; Object x; Thread w;
1424                    if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
1425                        if (!checkControl(s, c))
1426                            break;
1427                    }
1428                    else if ((a = array) == null || h == tail ||
1429                             (n = a.length) == 0 ||
1430                             (x = QA.getAcquire(a, i = (n - 1) & h)) == null) {
1431                        if (!checkEmpty(s, c))
1432                            break;
1433                    }
1434                    else if (d == 0L) {
1435                        if (!checkDemand(c))
1436                            break;
1437                    }
1438                    else if (((c & CONSUME) != 0 ||
1439                              CTL.compareAndSet(this, c, c | CONSUME)) &&
1440                             QA.compareAndSet(a, i, x, null)) {
1441                        HEAD.setRelease(this, ++h);
1442                        DEMAND.getAndAdd(this, -1L);
1443                        if ((w = waiter) != null)
1444                            signalWaiter(w);
1445                        try {
1446                            @SuppressWarnings("unchecked") T y = (T) x;
1447                            s.onNext(y);
1448                        } catch (Throwable ex) {
1449                            handleOnNext(s, ex);
1450                        }
1451                    }
1452                }
1453            }
1454        }
1455
1456        /**
1457         * Responds to control events in consume().
1458         */
1459        private boolean checkControl(Flow.Subscriber<? super T> s, int c) {
1460            boolean stat = true;
1461            if ((c & SUBSCRIBE) != 0) {
1462                if (CTL.compareAndSet(this, c, c & ~SUBSCRIBE)) {
1463                    try {
1464                        if (s != null)
1465                            s.onSubscribe(this);
1466                    } catch (Throwable ex) {
1467                        onError(ex);
1468                    }
1469                }
1470            }
1471            else if ((c & ERROR) != 0) {
1472                Throwable ex = pendingError;
1473                ctl = DISABLED;           // no need for CAS
1474                if (ex != null) {         // null if errorless cancel
1475                    try {
1476                        if (s != null)
1477                            s.onError(ex);
1478                    } catch (Throwable ignore) {
1479                    }
1480                }
1481            }
1482            else {
1483                detach();
1484                stat = false;
1485            }
1486            return stat;
1487        }
1488
1489        /**
1490         * Responds to apparent emptiness in consume().
1491         */
1492        private boolean checkEmpty(Flow.Subscriber<? super T> s, int c) {
1493            boolean stat = true;
1494            if (head == tail) {
1495                if ((c & CONSUME) != 0)
1496                    CTL.compareAndSet(this, c, c & ~CONSUME);
1497                else if ((c & COMPLETE) != 0) {
1498                    if (CTL.compareAndSet(this, c, DISABLED)) {
1499                        try {
1500                            if (s != null)
1501                                s.onComplete();
1502                        } catch (Throwable ignore) {
1503                        }
1504                    }
1505                }
1506                else if (CTL.compareAndSet(this, c, c & ~ACTIVE))
1507                    stat = false;
1508            }
1509            return stat;
1510        }
1511
1512        /**
1513         * Responds to apparent zero demand in consume().
1514         */
1515        private boolean checkDemand(int c) {
1516            boolean stat = true;
1517            if (demand == 0L) {
1518                if ((c & CONSUME) != 0)
1519                    CTL.compareAndSet(this, c, c & ~CONSUME);
1520                else if (CTL.compareAndSet(this, c, c & ~ACTIVE))
1521                    stat = false;
1522            }
1523            return stat;
1524        }
1525
1526        /**
1527         * Processes exception in Subscriber.onNext.
1528         */
1529        private void handleOnNext(Flow.Subscriber<? super T> s, Throwable ex) {
1530            BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> h;
1531            if ((h = onNextHandler) != null) {
1532                try {
1533                    h.accept(s, ex);
1534                } catch (Throwable ignore) {
1535                }
1536            }
1537            onError(ex);
1538        }
1539
1540        // VarHandle mechanics
1541        private static final VarHandle CTL;
1542        private static final VarHandle TAIL;
1543        private static final VarHandle HEAD;
1544        private static final VarHandle DEMAND;
1545        private static final VarHandle QA;
1546
1547        static {
1548            try {
1549                MethodHandles.Lookup l = MethodHandles.lookup();
1550                CTL = l.findVarHandle(BufferedSubscription.class, "ctl",
1551                                      int.class);
1552                TAIL = l.findVarHandle(BufferedSubscription.class, "tail",
1553                                       int.class);
1554                HEAD = l.findVarHandle(BufferedSubscription.class, "head",
1555                                       int.class);
1556                DEMAND = l.findVarHandle(BufferedSubscription.class, "demand",
1557                                         long.class);
1558                QA = MethodHandles.arrayElementVarHandle(Object[].class);
1559            } catch (ReflectiveOperationException e) {
1560                throw new Error(e);
1561            }
1562
1563            // Reduce the risk of rare disastrous classloading in first call to
1564            // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
1565            Class<?> ensureLoaded = LockSupport.class;
1566        }
1567    }
1568}
1569