1/*
2 * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package jdk.incubator.http;
27
28import java.util.Objects;
29import java.util.Optional;
30import java.util.concurrent.Executor;
31import java.util.concurrent.Flow;
32import java.util.concurrent.RejectedExecutionException;
33import java.util.concurrent.atomic.AtomicBoolean;
34import java.util.concurrent.atomic.AtomicLong;
35import java.util.concurrent.locks.Condition;
36import java.util.concurrent.locks.Lock;
37import java.util.concurrent.locks.ReentrantLock;
38import java.util.function.Supplier;
39
40class DefaultPublisher<T> implements Flow.Publisher<T> {
41
42    private final Supplier<Optional<T>> supplier;
43    // this executor will be wrapped in another executor
44    // which may override it and just run in the calling thread
45    // if it knows the user call is blocking
46    private final Executor executor;
47
48    /**
49     * Supplier returns non empty Optionals until final
50     */
51    DefaultPublisher(Supplier<Optional<T>> supplier, Executor executor) {
52        this.supplier = supplier;
53        this.executor = executor;
54    }
55
56    @Override
57    public void subscribe(Flow.Subscriber<? super T> subscriber) {
58        try {
59            subscriber.onSubscribe(new Subscription(subscriber));
60        } catch (RejectedExecutionException e) {
61            subscriber.onError(new IllegalStateException(e));
62        }
63    }
64
65    private class Subscription implements Flow.Subscription {
66
67        private final Flow.Subscriber<? super T> subscriber;
68        private final AtomicBoolean done = new AtomicBoolean();
69
70        private final AtomicLong demand = new AtomicLong();
71
72        private final Lock consumerLock = new ReentrantLock();
73        private final Condition consumerAlarm = consumerLock.newCondition();
74
75        Subscription(Flow.Subscriber<? super T> subscriber) {
76            this.subscriber = subscriber;
77
78            executor.execute(() -> {
79                try {
80                    while (!done.get()) {
81                        consumerLock.lock();
82                        try {
83                            while (!done.get() && demand.get() == 0) {
84                                consumerAlarm.await();
85                            }
86                        } finally {
87                            consumerLock.unlock();
88                        }
89
90                        long nbItemsDemanded = demand.getAndSet(0);
91                        for (long i = 0; i < nbItemsDemanded && !done.get(); i++) {
92                            try {
93                                Optional<T> item = Objects.requireNonNull(supplier.get());
94                                if (item.isPresent()) {
95                                    subscriber.onNext(item.get());
96                                } else {
97                                    if (done.compareAndSet(false, true)) {
98                                        subscriber.onComplete();
99                                    }
100                                }
101                            } catch (RuntimeException e) {
102                                if (done.compareAndSet(false, true)) {
103                                    subscriber.onError(e);
104                                }
105                            }
106                        }
107                    }
108                } catch (InterruptedException e) {
109                    Thread.currentThread().interrupt();
110                    if (done.compareAndSet(false, true)) {
111                        subscriber.onError(e);
112                    }
113                }
114            });
115        }
116
117        @Override
118        public void request(long n) {
119            if (!done.get() && n > 0) {
120                demand.updateAndGet(d -> (d + n > 0) ? d + n : Long.MAX_VALUE);
121                wakeConsumer();
122            } else if (done.compareAndSet(false, true)) {
123                subscriber.onError(new IllegalArgumentException("request(" + n + ")"));
124            }
125        }
126
127        @Override
128        public void cancel() {
129            done.set(true);
130            wakeConsumer();
131        }
132
133        private void wakeConsumer() {
134            consumerLock.lock();
135            try {
136                consumerAlarm.signal();
137            } finally {
138                consumerLock.unlock();
139            }
140        }
141
142    }
143}
144