1/* 2 * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26package jdk.incubator.http; 27 28import java.util.Objects; 29import java.util.Optional; 30import java.util.concurrent.Executor; 31import java.util.concurrent.Flow; 32import java.util.concurrent.RejectedExecutionException; 33import java.util.concurrent.atomic.AtomicBoolean; 34import java.util.concurrent.atomic.AtomicLong; 35import java.util.concurrent.locks.Condition; 36import java.util.concurrent.locks.Lock; 37import java.util.concurrent.locks.ReentrantLock; 38import java.util.function.Supplier; 39 40class DefaultPublisher<T> implements Flow.Publisher<T> { 41 42 private final Supplier<Optional<T>> supplier; 43 // this executor will be wrapped in another executor 44 // which may override it and just run in the calling thread 45 // if it knows the user call is blocking 46 private final Executor executor; 47 48 /** 49 * Supplier returns non empty Optionals until final 50 */ 51 DefaultPublisher(Supplier<Optional<T>> supplier, Executor executor) { 52 this.supplier = supplier; 53 this.executor = executor; 54 } 55 56 @Override 57 public void subscribe(Flow.Subscriber<? super T> subscriber) { 58 try { 59 subscriber.onSubscribe(new Subscription(subscriber)); 60 } catch (RejectedExecutionException e) { 61 subscriber.onError(new IllegalStateException(e)); 62 } 63 } 64 65 private class Subscription implements Flow.Subscription { 66 67 private final Flow.Subscriber<? super T> subscriber; 68 private final AtomicBoolean done = new AtomicBoolean(); 69 70 private final AtomicLong demand = new AtomicLong(); 71 72 private final Lock consumerLock = new ReentrantLock(); 73 private final Condition consumerAlarm = consumerLock.newCondition(); 74 75 Subscription(Flow.Subscriber<? super T> subscriber) { 76 this.subscriber = subscriber; 77 78 executor.execute(() -> { 79 try { 80 while (!done.get()) { 81 consumerLock.lock(); 82 try { 83 while (!done.get() && demand.get() == 0) { 84 consumerAlarm.await(); 85 } 86 } finally { 87 consumerLock.unlock(); 88 } 89 90 long nbItemsDemanded = demand.getAndSet(0); 91 for (long i = 0; i < nbItemsDemanded && !done.get(); i++) { 92 try { 93 Optional<T> item = Objects.requireNonNull(supplier.get()); 94 if (item.isPresent()) { 95 subscriber.onNext(item.get()); 96 } else { 97 if (done.compareAndSet(false, true)) { 98 subscriber.onComplete(); 99 } 100 } 101 } catch (RuntimeException e) { 102 if (done.compareAndSet(false, true)) { 103 subscriber.onError(e); 104 } 105 } 106 } 107 } 108 } catch (InterruptedException e) { 109 Thread.currentThread().interrupt(); 110 if (done.compareAndSet(false, true)) { 111 subscriber.onError(e); 112 } 113 } 114 }); 115 } 116 117 @Override 118 public void request(long n) { 119 if (!done.get() && n > 0) { 120 demand.updateAndGet(d -> (d + n > 0) ? d + n : Long.MAX_VALUE); 121 wakeConsumer(); 122 } else if (done.compareAndSet(false, true)) { 123 subscriber.onError(new IllegalArgumentException("request(" + n + ")")); 124 } 125 } 126 127 @Override 128 public void cancel() { 129 done.set(true); 130 wakeConsumer(); 131 } 132 133 private void wakeConsumer() { 134 consumerLock.lock(); 135 try { 136 consumerAlarm.signal(); 137 } finally { 138 consumerLock.unlock(); 139 } 140 } 141 142 } 143} 144