1/*
2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package jdk.incubator.http.internal.common;
26
27
28import java.io.Closeable;
29import java.io.IOException;
30import java.nio.ByteBuffer;
31import java.util.ArrayList;
32import java.util.Arrays;
33import java.util.Deque;
34import java.util.List;
35import java.util.concurrent.ConcurrentLinkedDeque;
36import java.util.concurrent.atomic.AtomicInteger;
37import java.util.function.BiConsumer;
38
39public class AsyncWriteQueue implements Closeable {
40
41    private static final int IDLE    = 0;     // nobody is flushing from the queue
42    private static final int FLUSHING = 1;    // there is the only thread flushing from the queue
43    private static final int REFLUSHING = 2;  // while one thread was flushing from the queue
44                                              // the other thread put data into the queue.
45                                              // flushing thread should recheck queue before switching to idle state.
46    private static final int DELAYED = 3;     // flushing is delayed
47                                              // either by PlainHttpConnection.WriteEvent registration, or
48                                              // SSL handshaking
49
50    private static final int CLOSED = 4;      // queue is closed
51
52    private final AtomicInteger state = new AtomicInteger(IDLE);
53    private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>();
54    private final BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction;
55
56    // Queue may be processed in two modes:
57    // 1. if(!doFullDrain) - invoke callback on each chunk
58    // 2. if(doFullDrain)  - drain the whole queue, merge all chunks into the single array and invoke callback
59    private final boolean doFullDrain;
60
61    private ByteBufferReference[] delayedElement = null;
62
63    public AsyncWriteQueue(BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction) {
64        this(consumeAction, true);
65    }
66
67    public AsyncWriteQueue(BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction, boolean doFullDrain) {
68        this.consumeAction = consumeAction;
69        this.doFullDrain = doFullDrain;
70    }
71
72    public void put(ByteBufferReference[] e) throws IOException {
73        ensureOpen();
74        queue.addLast(e);
75    }
76
77    public void putFirst(ByteBufferReference[] e) throws IOException {
78        ensureOpen();
79        queue.addFirst(e);
80    }
81
82    /**
83     * retruns true if flushing was performed
84     * @return
85     * @throws IOException
86     */
87    public boolean flush() throws IOException {
88        while(true) {
89            switch (state.get()) {
90                case IDLE:
91                    if(state.compareAndSet(IDLE, FLUSHING)) {
92                        flushLoop();
93                        return true;
94                    }
95                    break;
96                case FLUSHING:
97                    if(state.compareAndSet(FLUSHING, REFLUSHING)) {
98                        return false;
99                    }
100                    break;
101                case REFLUSHING:
102                case DELAYED:
103                    return false;
104                case CLOSED:
105                    throw new IOException("Queue closed");
106            }
107        }
108    }
109
110    /*
111     *  race invocations of flushDelayed are not allowed.
112     *  flushDelayed should be invoked only from:
113     *   - SelectorManager thread
114     *   - Handshaking thread
115     */
116    public void flushDelayed() throws IOException {
117        ensureOpen();
118        if(!state.compareAndSet(DELAYED, FLUSHING)) {
119            ensureOpen(); // if CAS failed when close was set - throw proper exception
120            throw new RuntimeException("Shouldn't happen");
121        }
122        flushLoop();
123    }
124
125    private ByteBufferReference[] drain(ByteBufferReference[] prev) {
126        assert prev != null;
127        if(doFullDrain) {
128            ByteBufferReference[] next = queue.poll();
129            if(next == null) {
130                return prev;
131            }
132            List<ByteBufferReference> drained = new ArrayList<>();
133            drained.addAll(Arrays.asList(prev));
134            drained.addAll(Arrays.asList(next));
135            while ((next = queue.poll()) != null) {
136                drained.addAll(Arrays.asList(next));
137            }
138            return drained.toArray(new ByteBufferReference[0]);
139        } else {
140            return prev;
141        }
142    }
143
144    private ByteBufferReference[] drain() {
145        ByteBufferReference[] next = queue.poll();
146        return next == null ? null : drain(next);
147    }
148
149    private void flushLoop() throws IOException {
150        ByteBufferReference[] element;
151        if (delayedElement != null) {
152            element = drain(delayedElement);
153            delayedElement = null;
154        } else {
155            element = drain();
156        }
157        while(true) {
158            while (element != null) {
159                consumeAction.accept(element, this);
160                if (state.get() == DELAYED) {
161                    return;
162                }
163                element = drain();
164            }
165            switch (state.get()) {
166                case IDLE:
167                case DELAYED:
168                    throw new RuntimeException("Shouldn't happen");
169                case FLUSHING:
170                    if(state.compareAndSet(FLUSHING, IDLE)) {
171                        return;
172                    }
173                    break;
174                case REFLUSHING:
175                    // We need to check if new elements were put after last poll() and do graceful exit
176                    state.compareAndSet(REFLUSHING, FLUSHING);
177                    break;
178                case CLOSED:
179                    throw new IOException("Queue closed");
180            }
181            element = drain();
182        }
183    }
184
185    /*
186     * The methods returns unprocessed chunk of buffers into beginning of the queue.
187     * Invocation of the method allowed only inside consume callback,
188     * and consume callback is invoked only when the queue in FLUSHING or REFLUSHING state.
189     */
190    public void setDelayed(ByteBufferReference[] delayedElement) throws IOException {
191        while(true) {
192            int state = this.state.get();
193            switch (state) {
194                case IDLE:
195                case DELAYED:
196                    throw new RuntimeException("Shouldn't happen");
197                case FLUSHING:
198                case REFLUSHING:
199                    if(this.state.compareAndSet(state, DELAYED)) {
200                        this.delayedElement = delayedElement;
201                        return;
202                    }
203                    break;
204                case CLOSED:
205                    throw new IOException("Queue closed");
206            }
207        }
208
209    }
210
211    private void ensureOpen() throws IOException {
212        if (state.get() == CLOSED) {
213            throw new IOException("Queue closed");
214        }
215    }
216
217    @Override
218    public void close() throws IOException {
219        state.getAndSet(CLOSED);
220    }
221
222}
223