1/*
2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package jdk.incubator.http.internal.common;
26
27
28import java.io.Closeable;
29import java.io.IOException;
30import java.util.ArrayList;
31import java.util.Arrays;
32import java.util.Deque;
33import java.util.List;
34import java.util.concurrent.ConcurrentLinkedDeque;
35import java.util.concurrent.atomic.AtomicInteger;
36
37public class AsyncWriteQueue implements Closeable {
38
39    @FunctionalInterface
40    public static interface AsyncConsumer {
41        /**
42         * Takes an array of buffer reference and attempt to send the data
43         * downstream. If not all the data can be sent, then push back
44         * to the source queue by calling {@code source.setDelayed(buffers)}
45         * and return false. If all the data was successfully sent downstream
46         * then returns true.
47         * @param buffers An array of ButeBufferReference containing data
48         *                to send downstream.
49         * @param source This AsyncWriteQueue.
50         * @return true if all the data could be sent downstream, false otherwise.
51         */
52        boolean trySend(ByteBufferReference[] buffers, AsyncWriteQueue source);
53    }
54
55    private static final int IDLE    = 0;     // nobody is flushing from the queue
56    private static final int FLUSHING = 1;    // there is the only thread flushing from the queue
57    private static final int REFLUSHING = 2;  // while one thread was flushing from the queue
58                                              // the other thread put data into the queue.
59                                              // flushing thread should recheck queue before switching to idle state.
60    private static final int DELAYED = 3;     // flushing is delayed
61                                              // either by PlainHttpConnection.WriteEvent registration, or
62                                              // SSL handshaking
63
64    private static final int CLOSED = 4;      // queue is closed
65
66    private final AtomicInteger state = new AtomicInteger(IDLE);
67    private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>();
68    private final AsyncConsumer consumeAction;
69
70    // Queue may be processed in two modes:
71    // 1. if(!doFullDrain) - invoke callback on each chunk
72    // 2. if(doFullDrain)  - drain the whole queue, merge all chunks into the single array and invoke callback
73    private final boolean doFullDrain;
74
75    private ByteBufferReference[] delayedElement = null;
76
77    public AsyncWriteQueue(AsyncConsumer consumeAction) {
78        this(consumeAction, true);
79    }
80
81    public AsyncWriteQueue(AsyncConsumer consumeAction, boolean doFullDrain) {
82        this.consumeAction = consumeAction;
83        this.doFullDrain = doFullDrain;
84    }
85
86    public void put(ByteBufferReference[] e) throws IOException {
87        ensureOpen();
88        queue.addLast(e);
89    }
90
91    public void putFirst(ByteBufferReference[] e) throws IOException {
92        ensureOpen();
93        queue.addFirst(e);
94    }
95
96    /**
97     * retruns true if flushing was performed
98     * @return
99     * @throws IOException
100     */
101    public boolean flush() throws IOException {
102        while(true) {
103            switch (state.get()) {
104                case IDLE:
105                    if(state.compareAndSet(IDLE, FLUSHING)) {
106                        flushLoop();
107                        return true;
108                    }
109                    break;
110                case FLUSHING:
111                    if(state.compareAndSet(FLUSHING, REFLUSHING)) {
112                        return false;
113                    }
114                    break;
115                case REFLUSHING:
116                case DELAYED:
117                    return false;
118                case CLOSED:
119                    throw new IOException("Queue closed");
120            }
121        }
122    }
123
124    /*
125     *  race invocations of flushDelayed are not allowed.
126     *  flushDelayed should be invoked only from:
127     *   - SelectorManager thread
128     *   - Handshaking thread
129     */
130    public void flushDelayed() throws IOException {
131        ensureOpen();
132        if(!state.compareAndSet(DELAYED, FLUSHING)) {
133            ensureOpen(); // if CAS failed when close was set - throw proper exception
134            throw new RuntimeException("Shouldn't happen");
135        }
136        flushLoop();
137    }
138
139    private ByteBufferReference[] drain(ByteBufferReference[] prev) {
140        assert prev != null;
141        if(doFullDrain) {
142            ByteBufferReference[] next = queue.poll();
143            if(next == null) {
144                return prev;
145            }
146            List<ByteBufferReference> drained = new ArrayList<>();
147            drained.addAll(Arrays.asList(prev));
148            drained.addAll(Arrays.asList(next));
149            while ((next = queue.poll()) != null) {
150                drained.addAll(Arrays.asList(next));
151            }
152            return drained.toArray(new ByteBufferReference[0]);
153        } else {
154            return prev;
155        }
156    }
157
158    private ByteBufferReference[] drain() {
159        ByteBufferReference[] next = queue.poll();
160        return next == null ? null : drain(next);
161    }
162
163    private void flushLoop() throws IOException {
164        ByteBufferReference[] element;
165        if (delayedElement != null) {
166            element = drain(delayedElement);
167            delayedElement = null;
168        } else {
169            element = drain();
170        }
171        while(true) {
172            while (element != null) {
173                if (!consumeAction.trySend(element, this)) {
174                    return;
175                }
176                element = drain();
177            }
178            switch (state.get()) {
179                case IDLE:
180                case DELAYED:
181                    throw new RuntimeException("Shouldn't happen");
182                case FLUSHING:
183                    if(state.compareAndSet(FLUSHING, IDLE)) {
184                        return;
185                    }
186                    break;
187                case REFLUSHING:
188                    // We need to check if new elements were put after last poll() and do graceful exit
189                    state.compareAndSet(REFLUSHING, FLUSHING);
190                    break;
191                case CLOSED:
192                    throw new IOException("Queue closed");
193            }
194            element = drain();
195        }
196    }
197
198    /*
199     * The methods returns unprocessed chunk of buffers into beginning of the queue.
200     * Invocation of the method allowed only inside consume callback,
201     * and consume callback is invoked only when the queue in FLUSHING or REFLUSHING state.
202     */
203    public void setDelayed(ByteBufferReference[] delayedElement) throws IOException {
204        while(true) {
205            int state = this.state.get();
206            switch (state) {
207                case IDLE:
208                case DELAYED:
209                    throw new RuntimeException("Shouldn't happen");
210                case FLUSHING:
211                case REFLUSHING:
212                    if(this.state.compareAndSet(state, DELAYED)) {
213                        this.delayedElement = delayedElement;
214                        return;
215                    }
216                    break;
217                case CLOSED:
218                    throw new IOException("Queue closed");
219            }
220        }
221
222    }
223
224    private void ensureOpen() throws IOException {
225        if (state.get() == CLOSED) {
226            throw new IOException("Queue closed");
227        }
228    }
229
230    @Override
231    public void close() throws IOException {
232        state.getAndSet(CLOSED);
233    }
234
235}
236