1/* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25package jdk.incubator.http.internal.common; 26 27 28import java.io.Closeable; 29import java.io.IOException; 30import java.nio.ByteBuffer; 31import java.util.ArrayList; 32import java.util.Arrays; 33import java.util.Deque; 34import java.util.List; 35import java.util.concurrent.ConcurrentLinkedDeque; 36import java.util.concurrent.atomic.AtomicInteger; 37import java.util.function.BiConsumer; 38 39public class AsyncWriteQueue implements Closeable { 40 41 private static final int IDLE = 0; // nobody is flushing from the queue 42 private static final int FLUSHING = 1; // there is the only thread flushing from the queue 43 private static final int REFLUSHING = 2; // while one thread was flushing from the queue 44 // the other thread put data into the queue. 45 // flushing thread should recheck queue before switching to idle state. 46 private static final int DELAYED = 3; // flushing is delayed 47 // either by PlainHttpConnection.WriteEvent registration, or 48 // SSL handshaking 49 50 private static final int CLOSED = 4; // queue is closed 51 52 private final AtomicInteger state = new AtomicInteger(IDLE); 53 private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>(); 54 private final BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction; 55 56 // Queue may be processed in two modes: 57 // 1. if(!doFullDrain) - invoke callback on each chunk 58 // 2. if(doFullDrain) - drain the whole queue, merge all chunks into the single array and invoke callback 59 private final boolean doFullDrain; 60 61 private ByteBufferReference[] delayedElement = null; 62 63 public AsyncWriteQueue(BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction) { 64 this(consumeAction, true); 65 } 66 67 public AsyncWriteQueue(BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction, boolean doFullDrain) { 68 this.consumeAction = consumeAction; 69 this.doFullDrain = doFullDrain; 70 } 71 72 public void put(ByteBufferReference[] e) throws IOException { 73 ensureOpen(); 74 queue.addLast(e); 75 } 76 77 public void putFirst(ByteBufferReference[] e) throws IOException { 78 ensureOpen(); 79 queue.addFirst(e); 80 } 81 82 /** 83 * retruns true if flushing was performed 84 * @return 85 * @throws IOException 86 */ 87 public boolean flush() throws IOException { 88 while(true) { 89 switch (state.get()) { 90 case IDLE: 91 if(state.compareAndSet(IDLE, FLUSHING)) { 92 flushLoop(); 93 return true; 94 } 95 break; 96 case FLUSHING: 97 if(state.compareAndSet(FLUSHING, REFLUSHING)) { 98 return false; 99 } 100 break; 101 case REFLUSHING: 102 case DELAYED: 103 return false; 104 case CLOSED: 105 throw new IOException("Queue closed"); 106 } 107 } 108 } 109 110 /* 111 * race invocations of flushDelayed are not allowed. 112 * flushDelayed should be invoked only from: 113 * - SelectorManager thread 114 * - Handshaking thread 115 */ 116 public void flushDelayed() throws IOException { 117 ensureOpen(); 118 if(!state.compareAndSet(DELAYED, FLUSHING)) { 119 ensureOpen(); // if CAS failed when close was set - throw proper exception 120 throw new RuntimeException("Shouldn't happen"); 121 } 122 flushLoop(); 123 } 124 125 private ByteBufferReference[] drain(ByteBufferReference[] prev) { 126 assert prev != null; 127 if(doFullDrain) { 128 ByteBufferReference[] next = queue.poll(); 129 if(next == null) { 130 return prev; 131 } 132 List<ByteBufferReference> drained = new ArrayList<>(); 133 drained.addAll(Arrays.asList(prev)); 134 drained.addAll(Arrays.asList(next)); 135 while ((next = queue.poll()) != null) { 136 drained.addAll(Arrays.asList(next)); 137 } 138 return drained.toArray(new ByteBufferReference[0]); 139 } else { 140 return prev; 141 } 142 } 143 144 private ByteBufferReference[] drain() { 145 ByteBufferReference[] next = queue.poll(); 146 return next == null ? null : drain(next); 147 } 148 149 private void flushLoop() throws IOException { 150 ByteBufferReference[] element; 151 if (delayedElement != null) { 152 element = drain(delayedElement); 153 delayedElement = null; 154 } else { 155 element = drain(); 156 } 157 while(true) { 158 while (element != null) { 159 consumeAction.accept(element, this); 160 if (state.get() == DELAYED) { 161 return; 162 } 163 element = drain(); 164 } 165 switch (state.get()) { 166 case IDLE: 167 case DELAYED: 168 throw new RuntimeException("Shouldn't happen"); 169 case FLUSHING: 170 if(state.compareAndSet(FLUSHING, IDLE)) { 171 return; 172 } 173 break; 174 case REFLUSHING: 175 // We need to check if new elements were put after last poll() and do graceful exit 176 state.compareAndSet(REFLUSHING, FLUSHING); 177 break; 178 case CLOSED: 179 throw new IOException("Queue closed"); 180 } 181 element = drain(); 182 } 183 } 184 185 /* 186 * The methods returns unprocessed chunk of buffers into beginning of the queue. 187 * Invocation of the method allowed only inside consume callback, 188 * and consume callback is invoked only when the queue in FLUSHING or REFLUSHING state. 189 */ 190 public void setDelayed(ByteBufferReference[] delayedElement) throws IOException { 191 while(true) { 192 int state = this.state.get(); 193 switch (state) { 194 case IDLE: 195 case DELAYED: 196 throw new RuntimeException("Shouldn't happen"); 197 case FLUSHING: 198 case REFLUSHING: 199 if(this.state.compareAndSet(state, DELAYED)) { 200 this.delayedElement = delayedElement; 201 return; 202 } 203 break; 204 case CLOSED: 205 throw new IOException("Queue closed"); 206 } 207 } 208 209 } 210 211 private void ensureOpen() throws IOException { 212 if (state.get() == CLOSED) { 213 throw new IOException("Queue closed"); 214 } 215 } 216 217 @Override 218 public void close() throws IOException { 219 state.getAndSet(CLOSED); 220 } 221 222} 223