Http2Connection.java revision 16234:3b25414eb6af
1/* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26package jdk.incubator.http; 27 28import java.io.IOException; 29import java.net.InetSocketAddress; 30import java.net.URI; 31import jdk.incubator.http.HttpConnection.Mode; 32import java.nio.ByteBuffer; 33import java.nio.charset.StandardCharsets; 34import java.util.HashMap; 35import java.util.Iterator; 36import java.util.LinkedList; 37import java.util.List; 38import java.util.Map; 39import java.util.concurrent.CompletableFuture; 40import java.util.ArrayList; 41import java.util.Collections; 42import java.util.Formatter; 43import java.util.concurrent.ConcurrentHashMap; 44import java.util.concurrent.CountDownLatch; 45import java.util.stream.Collectors; 46import jdk.incubator.http.internal.common.*; 47import jdk.incubator.http.internal.frame.*; 48import jdk.incubator.http.internal.hpack.Encoder; 49import jdk.incubator.http.internal.hpack.Decoder; 50import jdk.incubator.http.internal.hpack.DecodingCallback; 51 52import static jdk.incubator.http.internal.frame.SettingsFrame.*; 53 54 55/** 56 * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used 57 * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff. 58 * 59 * Http2Connections belong to a Http2ClientImpl, (one of) which belongs 60 * to a HttpClientImpl. 61 * 62 * Creation cases: 63 * 1) upgraded HTTP/1.1 plain tcp connection 64 * 2) prior knowledge directly created plain tcp connection 65 * 3) directly created HTTP/2 SSL connection which uses ALPN. 66 * 67 * Sending is done by writing directly to underlying HttpConnection object which 68 * is operating in async mode. No flow control applies on output at this level 69 * and all writes are just executed as puts to an output Q belonging to HttpConnection 70 * Flow control is implemented by HTTP/2 protocol itself. 71 * 72 * Hpack header compression 73 * and outgoing stream creation is also done here, because these operations 74 * must be synchronized at the socket level. Stream objects send frames simply 75 * by placing them on the connection's output Queue. sendFrame() is called 76 * from a higher level (Stream) thread. 77 * 78 * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles 79 * incoming Http2Frames, and directs them to the appropriate Stream.incoming() 80 * or handles them directly itself. This thread performs hpack decompression 81 * and incoming stream creation (Server push). Incoming frames destined for a 82 * stream are provided by calling Stream.incoming(). 83 */ 84class Http2Connection { 85 86 87 /* 88 * ByteBuffer pooling strategy for HTTP/2 protocol: 89 * 90 * In general there are 4 points where ByteBuffers are used: 91 * - incoming/outgoing frames from/to ByteBufers plus incoming/outgoing encrypted data 92 * in case of SSL connection. 93 * 94 * 1. Outgoing frames encoded to ByteBuffers. 95 * Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc) 96 * At this place no pools at all. All outgoing buffers should be collected by GC. 97 * 98 * 2. Incoming ByteBuffers (decoded to frames). 99 * Here, total elimination of BB pool is not a good idea. 100 * We don't know how many bytes we will receive through network. 101 * So here we allocate buffer of reasonable size. The following life of the BB: 102 * - If all frames decoded from the BB are other than DataFrame and HeaderFrame (and HeaderFrame subclasses) 103 * BB is returned to pool, 104 * - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer obtained by slice() method. 105 * Such BB is never returned to pool and will be GCed. 106 * - If we decoded HeadersFrame from the BB. Then header decoding is performed inside processFrame method and 107 * the buffer could be release to pool. 108 * 109 * 3. SLL encrypted buffers. Here another pool was introduced and all net buffers are to/from the pool, 110 * because of we can't predict size encrypted packets. 111 * 112 */ 113 114 115 // A small class that allows to control the state of 116 // the connection preface. This is just a thin wrapper 117 // over a CountDownLatch. 118 private final class PrefaceController { 119 volatile boolean prefaceSent; 120 private final CountDownLatch latch = new CountDownLatch(1); 121 122 // This method returns immediately if the preface is sent, 123 // and blocks until the preface is sent if not. 124 // In the common case this where the preface is already sent 125 // this will cost not more than a volatile read. 126 void waitUntilPrefaceSent() { 127 if (!prefaceSent) { 128 try { 129 // If the preface is not sent then await on the latch 130 Log.logTrace("Waiting until connection preface is sent"); 131 latch.await(); 132 Log.logTrace("Preface sent: resuming reading"); 133 assert prefaceSent; 134 } catch (InterruptedException e) { 135 String msg = Utils.stackTrace(e); 136 Log.logTrace(msg); 137 shutdown(e); 138 } 139 } 140 } 141 142 // Mark that the connection preface is sent 143 void markPrefaceSent() { 144 assert !prefaceSent; 145 prefaceSent = true; 146 // Release the latch. If asyncReceive was scheduled it will 147 // be waiting for the release and will be woken up by this 148 // call. If not, then the semaphore will no longer be used after 149 // this. 150 latch.countDown(); 151 } 152 153 boolean isPrefaceSent() { 154 return prefaceSent; 155 } 156 } 157 158 volatile boolean closed; 159 160 //------------------------------------- 161 final HttpConnection connection; 162 private final HttpClientImpl client; 163 private final Http2ClientImpl client2; 164 private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>(); 165 private int nextstreamid; 166 private int nextPushStream = 2; 167 private final Encoder hpackOut; 168 private final Decoder hpackIn; 169 final SettingsFrame clientSettings; 170 private volatile SettingsFrame serverSettings; 171 private final String key; // for HttpClientImpl.connections map 172 private final FramesDecoder framesDecoder; 173 private final FramesEncoder framesEncoder = new FramesEncoder(); 174 175 /** 176 * Send Window controller for both connection and stream windows. 177 * Each of this connection's Streams MUST use this controller. 178 */ 179 private final WindowController windowController = new WindowController(); 180 private final PrefaceController prefaceController = new PrefaceController(); 181 final WindowUpdateSender windowUpdater; 182 183 static final int DEFAULT_FRAME_SIZE = 16 * 1024; 184 185 186 // TODO: need list of control frames from other threads 187 // that need to be sent 188 189 private Http2Connection(HttpConnection connection, 190 Http2ClientImpl client2, 191 int nextstreamid, 192 String key) { 193 this.connection = connection; 194 this.client = client2.client(); 195 this.client2 = client2; 196 this.nextstreamid = nextstreamid; 197 this.key = key; 198 this.clientSettings = this.client2.getClientSettings(); 199 this.framesDecoder = new FramesDecoder(this::processFrame, clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE)); 200 // serverSettings will be updated by server 201 this.serverSettings = SettingsFrame.getDefaultSettings(); 202 this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); 203 this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); 204 this.windowUpdater = new ConnectionWindowUpdateSender(this, client.getReceiveBufferSize()); 205 } 206 /** 207 * Case 1) Create from upgraded HTTP/1.1 connection. 208 * Is ready to use. Will not be SSL. exchange is the Exchange 209 * that initiated the connection, whose response will be delivered 210 * on a Stream. 211 */ 212 Http2Connection(HttpConnection connection, 213 Http2ClientImpl client2, 214 Exchange<?> exchange, 215 ByteBuffer initial) 216 throws IOException, InterruptedException 217 { 218 this(connection, 219 client2, 220 3, // stream 1 is registered during the upgrade 221 keyFor(connection)); 222 assert !(connection instanceof SSLConnection); 223 Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); 224 225 Stream<?> initialStream = createStream(exchange); 226 initialStream.registerStream(1); 227 windowController.registerStream(1, getInitialSendWindowSize()); 228 initialStream.requestSent(); 229 sendConnectionPreface(); 230 // start reading and writing 231 // start reading 232 AsyncConnection asyncConn = (AsyncConnection)connection; 233 asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer); 234 connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility. 235 asyncReceive(ByteBufferReference.of(initial)); 236 asyncConn.startReading(); 237 } 238 239 // async style but completes immediately 240 static CompletableFuture<Http2Connection> createAsync(HttpConnection connection, 241 Http2ClientImpl client2, 242 Exchange<?> exchange, 243 ByteBuffer initial) { 244 CompletableFuture<Http2Connection> cf = new MinimalFuture<>(); 245 try { 246 Http2Connection c = new Http2Connection(connection, client2, exchange, initial); 247 cf.complete(c); 248 } catch (IOException | InterruptedException e) { 249 cf.completeExceptionally(e); 250 } 251 return cf; 252 } 253 254 /** 255 * Cases 2) 3) 256 * 257 * request is request to be sent. 258 */ 259 Http2Connection(HttpRequestImpl request, Http2ClientImpl h2client) 260 throws IOException, InterruptedException 261 { 262 this(HttpConnection.getConnection(request.getAddress(h2client.client()), h2client.client(), request, true), 263 h2client, 264 1, 265 keyFor(request.uri(), request.proxy(h2client.client()))); 266 Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); 267 268 connection.connect(); 269 // start reading 270 AsyncConnection asyncConn = (AsyncConnection)connection; 271 asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer); 272 connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility. 273 asyncConn.startReading(); 274 sendConnectionPreface(); 275 } 276 277 static String keyFor(HttpConnection connection) { 278 boolean isProxy = connection.isProxied(); 279 boolean isSecure = connection.isSecure(); 280 InetSocketAddress addr = connection.address(); 281 282 return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort()); 283 } 284 285 static String keyFor(URI uri, InetSocketAddress proxy) { 286 boolean isSecure = uri.getScheme().equalsIgnoreCase("https"); 287 boolean isProxy = proxy != null; 288 289 String host; 290 int port; 291 292 if (isProxy) { 293 host = proxy.getHostString(); 294 port = proxy.getPort(); 295 } else { 296 host = uri.getHost(); 297 port = uri.getPort(); 298 } 299 return keyString(isSecure, isProxy, host, port); 300 } 301 302 // {C,S}:{H:P}:host:port 303 // C indicates clear text connection "http" 304 // S indicates secure "https" 305 // H indicates host (direct) connection 306 // P indicates proxy 307 // Eg: "S:H:foo.com:80" 308 static String keyString(boolean secure, boolean proxy, String host, int port) { 309 return (secure ? "S:" : "C:") + (proxy ? "P:" : "H:") + host + ":" + port; 310 } 311 312 String key() { 313 return this.key; 314 } 315 316 void putConnection() { 317 client2.putConnection(this); 318 } 319 320 private static String toHexdump1(ByteBuffer bb) { 321 bb.mark(); 322 StringBuilder sb = new StringBuilder(512); 323 Formatter f = new Formatter(sb); 324 325 while (bb.hasRemaining()) { 326 int i = Byte.toUnsignedInt(bb.get()); 327 f.format("%02x:", i); 328 } 329 sb.deleteCharAt(sb.length()-1); 330 bb.reset(); 331 return sb.toString(); 332 } 333 334 private static String toHexdump(ByteBuffer bb) { 335 List<String> words = new ArrayList<>(); 336 int i = 0; 337 bb.mark(); 338 while (bb.hasRemaining()) { 339 if (i % 2 == 0) { 340 words.add(""); 341 } 342 byte b = bb.get(); 343 String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1); 344 words.set(i / 2, words.get(i / 2) + hex); 345 i++; 346 } 347 bb.reset(); 348 return words.stream().collect(Collectors.joining(" ")); 349 } 350 351 private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) { 352 boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS); 353 354 ByteBufferReference[] buffers = frame.getHeaderBlock(); 355 for (int i = 0; i < buffers.length; i++) { 356 hpackIn.decode(buffers[i].get(), endOfHeaders && (i == buffers.length - 1), decoder); 357 } 358 } 359 360 int getInitialSendWindowSize() { 361 return serverSettings.getParameter(INITIAL_WINDOW_SIZE); 362 } 363 364 void close() { 365 GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes()); 366 // TODO: set last stream. For now zero ok. 367 sendFrame(f); 368 } 369 370 private ByteBufferPool readBufferPool = new ByteBufferPool(); 371 372 // provides buffer to read data (default size) 373 public ByteBufferReference getReadBuffer() { 374 return readBufferPool.get(getMaxReceiveFrameSize() + Http2Frame.FRAME_HEADER_SIZE); 375 } 376 377 private final Object readlock = new Object(); 378 379 public void asyncReceive(ByteBufferReference buffer) { 380 // We don't need to read anything and 381 // we don't want to send anything back to the server 382 // until the connection preface has been sent. 383 // Therefore we're going to wait if needed before reading 384 // (and thus replying) to anything. 385 // Starting to reply to something (e.g send an ACK to a 386 // SettingsFrame sent by the server) before the connection 387 // preface is fully sent might result in the server 388 // sending a GOAWAY frame with 'invalid_preface'. 389 prefaceController.waitUntilPrefaceSent(); 390 synchronized (readlock) { 391 assert prefaceController.isPrefaceSent(); 392 try { 393 framesDecoder.decode(buffer); 394 } catch (Throwable e) { 395 String msg = Utils.stackTrace(e); 396 Log.logTrace(msg); 397 shutdown(e); 398 } 399 } 400 } 401 402 403 void shutdown(Throwable t) { 404 Log.logError(t); 405 closed = true; 406 client2.deleteConnection(this); 407 List<Stream<?>> c = new LinkedList<>(streams.values()); 408 for (Stream<?> s : c) { 409 s.cancelImpl(t); 410 } 411 connection.close(); 412 } 413 414 /** 415 * Handles stream 0 (common) frames that apply to whole connection and passes 416 * other stream specific frames to that Stream object. 417 * 418 * Invokes Stream.incoming() which is expected to process frame without 419 * blocking. 420 */ 421 void processFrame(Http2Frame frame) throws IOException { 422 Log.logFrames(frame, "IN"); 423 int streamid = frame.streamid(); 424 if (frame instanceof MalformedFrame) { 425 Log.logError(((MalformedFrame) frame).getMessage()); 426 if (streamid == 0) { 427 protocolError(((MalformedFrame) frame).getErrorCode()); 428 } else { 429 resetStream(streamid, ((MalformedFrame) frame).getErrorCode()); 430 } 431 return; 432 } 433 if (streamid == 0) { 434 handleConnectionFrame(frame); 435 } else { 436 if (frame instanceof SettingsFrame) { 437 // The stream identifier for a SETTINGS frame MUST be zero 438 protocolError(GoAwayFrame.PROTOCOL_ERROR); 439 return; 440 } 441 442 Stream<?> stream = getStream(streamid); 443 if (stream == null) { 444 // Should never receive a frame with unknown stream id 445 446 // To avoid looping, an endpoint MUST NOT send a RST_STREAM in 447 // response to a RST_STREAM frame. 448 if (!(frame instanceof ResetFrame)) { 449 resetStream(streamid, ResetFrame.PROTOCOL_ERROR); 450 } 451 return; 452 } 453 if (frame instanceof PushPromiseFrame) { 454 PushPromiseFrame pp = (PushPromiseFrame)frame; 455 handlePushPromise(stream, pp); 456 } else if (frame instanceof HeaderFrame) { 457 // decode headers (or continuation) 458 decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer()); 459 stream.incoming(frame); 460 } else { 461 stream.incoming(frame); 462 } 463 } 464 } 465 466 private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp) 467 throws IOException 468 { 469 HttpRequestImpl parentReq = parent.request; 470 int promisedStreamid = pp.getPromisedStream(); 471 if (promisedStreamid != nextPushStream) { 472 resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR); 473 return; 474 } else { 475 nextPushStream += 2; 476 } 477 HeaderDecoder decoder = new HeaderDecoder(); 478 decodeHeaders(pp, decoder); 479 HttpHeadersImpl headers = decoder.headers(); 480 HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers); 481 Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi); 482 Stream.PushedStream<?,T> pushStream = createPushStream(parent, pushExch); 483 pushExch.exchImpl = pushStream; 484 pushStream.registerStream(promisedStreamid); 485 parent.incoming_pushPromise(pushReq, pushStream); 486 } 487 488 private void handleConnectionFrame(Http2Frame frame) 489 throws IOException 490 { 491 switch (frame.type()) { 492 case SettingsFrame.TYPE: 493 handleSettings((SettingsFrame)frame); 494 break; 495 case PingFrame.TYPE: 496 handlePing((PingFrame)frame); 497 break; 498 case GoAwayFrame.TYPE: 499 handleGoAway((GoAwayFrame)frame); 500 break; 501 case WindowUpdateFrame.TYPE: 502 handleWindowUpdate((WindowUpdateFrame)frame); 503 break; 504 default: 505 protocolError(ErrorFrame.PROTOCOL_ERROR); 506 } 507 } 508 509 void resetStream(int streamid, int code) throws IOException { 510 Log.logError( 511 "Resetting stream {0,number,integer} with error code {1,number,integer}", 512 streamid, code); 513 ResetFrame frame = new ResetFrame(streamid, code); 514 sendFrame(frame); 515 closeStream(streamid); 516 } 517 518 void closeStream(int streamid) { 519 Stream<?> s = streams.remove(streamid); 520 // ## Remove s != null. It is a hack for delayed cancellation,reset 521 if (s != null && !(s instanceof Stream.PushedStream)) { 522 // Since PushStreams have no request body, then they have no 523 // corresponding entry in the window controller. 524 windowController.removeStream(streamid); 525 } 526 } 527 /** 528 * Increments this connection's send Window by the amount in the given frame. 529 */ 530 private void handleWindowUpdate(WindowUpdateFrame f) 531 throws IOException 532 { 533 int amount = f.getUpdate(); 534 if (amount <= 0) { 535 // ## temporarily disable to workaround a bug in Jetty where it 536 // ## sends Window updates with a 0 update value. 537 //protocolError(ErrorFrame.PROTOCOL_ERROR); 538 } else { 539 boolean success = windowController.increaseConnectionWindow(amount); 540 if (!success) { 541 protocolError(ErrorFrame.FLOW_CONTROL_ERROR); // overflow 542 } 543 } 544 } 545 546 private void protocolError(int errorCode) 547 throws IOException 548 { 549 GoAwayFrame frame = new GoAwayFrame(0, errorCode); 550 sendFrame(frame); 551 shutdown(new IOException("protocol error")); 552 } 553 554 private void handleSettings(SettingsFrame frame) 555 throws IOException 556 { 557 assert frame.streamid() == 0; 558 if (!frame.getFlag(SettingsFrame.ACK)) { 559 int oldWindowSize = serverSettings.getParameter(INITIAL_WINDOW_SIZE); 560 int newWindowSize = frame.getParameter(INITIAL_WINDOW_SIZE); 561 int diff = newWindowSize - oldWindowSize; 562 if (diff != 0) { 563 windowController.adjustActiveStreams(diff); 564 } 565 serverSettings = frame; 566 sendFrame(new SettingsFrame(SettingsFrame.ACK)); 567 } 568 } 569 570 private void handlePing(PingFrame frame) 571 throws IOException 572 { 573 frame.setFlag(PingFrame.ACK); 574 sendUnorderedFrame(frame); 575 } 576 577 private void handleGoAway(GoAwayFrame frame) 578 throws IOException 579 { 580 shutdown(new IOException( 581 String.valueOf(connection.channel().getLocalAddress()) 582 +": GOAWAY received")); 583 } 584 585 /** 586 * Max frame size we are allowed to send 587 */ 588 public int getMaxSendFrameSize() { 589 int param = serverSettings.getParameter(MAX_FRAME_SIZE); 590 if (param == -1) { 591 param = DEFAULT_FRAME_SIZE; 592 } 593 return param; 594 } 595 596 /** 597 * Max frame size we will receive 598 */ 599 public int getMaxReceiveFrameSize() { 600 return clientSettings.getParameter(MAX_FRAME_SIZE); 601 } 602 603 // Not sure how useful this is. 604 public int getMaxHeadersSize() { 605 return serverSettings.getParameter(MAX_HEADER_LIST_SIZE); 606 } 607 608 private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; 609 610 private static final byte[] PREFACE_BYTES = 611 CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1); 612 613 /** 614 * Sends Connection preface and Settings frame with current preferred 615 * values 616 */ 617 private void sendConnectionPreface() throws IOException { 618 Log.logTrace("{0}: start sending connection preface to {1}", 619 connection.channel().getLocalAddress(), 620 connection.address()); 621 SettingsFrame sf = client2.getClientSettings(); 622 ByteBufferReference ref = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf); 623 Log.logFrames(sf, "OUT"); 624 // send preface bytes and SettingsFrame together 625 connection.write(ref.get()); 626 627 Log.logTrace("PREFACE_BYTES sent"); 628 Log.logTrace("Settings Frame sent"); 629 630 // send a Window update for the receive buffer we are using 631 // minus the initial 64 K specified in protocol 632 final int len = client2.client().getReceiveBufferSize() - (64 * 1024 - 1); 633 windowUpdater.sendWindowUpdate(len); 634 Log.logTrace("finished sending connection preface"); 635 prefaceController.markPrefaceSent(); 636 } 637 638 /** 639 * Returns an existing Stream with given id, or null if doesn't exist 640 */ 641 @SuppressWarnings("unchecked") 642 <T> Stream<T> getStream(int streamid) { 643 return (Stream<T>)streams.get(streamid); 644 } 645 646 /** 647 * Creates Stream with given id. 648 */ 649 <T> Stream<T> createStream(Exchange<T> exchange) { 650 Stream<T> stream = new Stream<>(client, this, exchange, windowController); 651 return stream; 652 } 653 654 <T> Stream.PushedStream<?,T> createPushStream(Stream<T> parent, Exchange<T> pushEx) { 655 PushGroup<?,T> pg = parent.exchange.getPushGroup(); 656 return new Stream.PushedStream<>(pg, client, this, parent, pushEx); 657 } 658 659 <T> void putStream(Stream<T> stream, int streamid) { 660 streams.put(streamid, stream); 661 } 662 663 void deleteStream(int streamid) { 664 streams.remove(streamid); 665 windowController.removeStream(streamid); 666 } 667 668 /** 669 * Encode the headers into a List<ByteBuffer> and then create HEADERS 670 * and CONTINUATION frames from the list and return the List<Http2Frame>. 671 */ 672 private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) { 673 List<ByteBufferReference> buffers = encodeHeadersImpl( 674 getMaxSendFrameSize(), 675 frame.getAttachment().getRequestPseudoHeaders(), 676 frame.getUserHeaders(), 677 frame.getSystemHeaders()); 678 679 List<HeaderFrame> frames = new ArrayList<>(buffers.size()); 680 Iterator<ByteBufferReference> bufIterator = buffers.iterator(); 681 HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next()); 682 frames.add(oframe); 683 while(bufIterator.hasNext()) { 684 oframe = new ContinuationFrame(frame.streamid(), bufIterator.next()); 685 frames.add(oframe); 686 } 687 oframe.setFlag(HeaderFrame.END_HEADERS); 688 return frames; 689 } 690 691 // Dedicated cache for headers encoding ByteBuffer. 692 // There can be no concurrent access to this buffer as all access to this buffer 693 // and its content happen within a single critical code block section protected 694 // by the sendLock. / (see sendFrame()) 695 private ByteBufferPool headerEncodingPool = new ByteBufferPool(); 696 697 private ByteBufferReference getHeaderBuffer(int maxFrameSize) { 698 ByteBufferReference ref = headerEncodingPool.get(maxFrameSize); 699 ref.get().limit(maxFrameSize); 700 return ref; 701 } 702 703 /* 704 * Encodes all the headers from the given HttpHeaders into the given List 705 * of buffers. 706 * 707 * From https://tools.ietf.org/html/rfc7540#section-8.1.2 : 708 * 709 * ...Just as in HTTP/1.x, header field names are strings of ASCII 710 * characters that are compared in a case-insensitive fashion. However, 711 * header field names MUST be converted to lowercase prior to their 712 * encoding in HTTP/2... 713 */ 714 private List<ByteBufferReference> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) { 715 ByteBufferReference buffer = getHeaderBuffer(maxFrameSize); 716 List<ByteBufferReference> buffers = new ArrayList<>(); 717 for(HttpHeaders header : headers) { 718 for (Map.Entry<String, List<String>> e : header.map().entrySet()) { 719 String lKey = e.getKey().toLowerCase(); 720 List<String> values = e.getValue(); 721 for (String value : values) { 722 hpackOut.header(lKey, value); 723 while (!hpackOut.encode(buffer.get())) { 724 buffer.get().flip(); 725 buffers.add(buffer); 726 buffer = getHeaderBuffer(maxFrameSize); 727 } 728 } 729 } 730 } 731 buffer.get().flip(); 732 buffers.add(buffer); 733 return buffers; 734 } 735 736 private ByteBufferReference[] encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) { 737 oh.streamid(stream.streamid); 738 if (Log.headers()) { 739 StringBuilder sb = new StringBuilder("HEADERS FRAME (stream="); 740 sb.append(stream.streamid).append(")\n"); 741 Log.dumpHeaders(sb, " ", oh.getAttachment().getRequestPseudoHeaders()); 742 Log.dumpHeaders(sb, " ", oh.getSystemHeaders()); 743 Log.dumpHeaders(sb, " ", oh.getUserHeaders()); 744 Log.logHeaders(sb.toString()); 745 } 746 List<HeaderFrame> frames = encodeHeaders(oh); 747 return encodeFrames(frames); 748 } 749 750 private ByteBufferReference[] encodeFrames(List<HeaderFrame> frames) { 751 if (Log.frames()) { 752 frames.forEach(f -> Log.logFrames(f, "OUT")); 753 } 754 return framesEncoder.encodeFrames(frames); 755 } 756 757 static Throwable getExceptionFrom(CompletableFuture<?> cf) { 758 try { 759 cf.get(); 760 return null; 761 } catch (Throwable e) { 762 if (e.getCause() != null) { 763 return e.getCause(); 764 } else { 765 return e; 766 } 767 } 768 } 769 770 private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) { 771 Stream<?> stream = oh.getAttachment(); 772 int streamid = nextstreamid; 773 nextstreamid += 2; 774 stream.registerStream(streamid); 775 // set outgoing window here. This allows thread sending 776 // body to proceed. 777 windowController.registerStream(streamid, getInitialSendWindowSize()); 778 return stream; 779 } 780 781 private final Object sendlock = new Object(); 782 783 void sendFrame(Http2Frame frame) { 784 try { 785 synchronized (sendlock) { 786 if (frame instanceof OutgoingHeaders) { 787 @SuppressWarnings("unchecked") 788 OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame; 789 Stream<?> stream = registerNewStream(oh); 790 // provide protection from inserting unordered frames between Headers and Continuation 791 connection.writeAsync(encodeHeaders(oh, stream)); 792 } else { 793 connection.writeAsync(encodeFrame(frame)); 794 } 795 } 796 connection.flushAsync(); 797 } catch (IOException e) { 798 if (!closed) { 799 Log.logError(e); 800 shutdown(e); 801 } 802 } 803 } 804 805 private ByteBufferReference[] encodeFrame(Http2Frame frame) { 806 Log.logFrames(frame, "OUT"); 807 return framesEncoder.encodeFrame(frame); 808 } 809 810 void sendDataFrame(DataFrame frame) { 811 try { 812 connection.writeAsync(encodeFrame(frame)); 813 connection.flushAsync(); 814 } catch (IOException e) { 815 if (!closed) { 816 Log.logError(e); 817 shutdown(e); 818 } 819 } 820 } 821 822 /* 823 * Direct call of the method bypasses synchronization on "sendlock" and 824 * allowed only of control frames: WindowUpdateFrame, PingFrame and etc. 825 * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame. 826 */ 827 void sendUnorderedFrame(Http2Frame frame) { 828 try { 829 connection.writeAsyncUnordered(encodeFrame(frame)); 830 connection.flushAsync(); 831 } catch (IOException e) { 832 if (!closed) { 833 Log.logError(e); 834 shutdown(e); 835 } 836 } 837 } 838 839 static class HeaderDecoder implements DecodingCallback { 840 HttpHeadersImpl headers; 841 842 HeaderDecoder() { 843 this.headers = new HttpHeadersImpl(); 844 } 845 846 @Override 847 public void onDecoded(CharSequence name, CharSequence value) { 848 headers.addHeader(name.toString(), value.toString()); 849 } 850 851 HttpHeadersImpl headers() { 852 return headers; 853 } 854 } 855 856 static final class ConnectionWindowUpdateSender extends WindowUpdateSender { 857 858 public ConnectionWindowUpdateSender(Http2Connection connection, 859 int initialWindowSize) { 860 super(connection, initialWindowSize); 861 } 862 863 @Override 864 int getStreamId() { 865 return 0; 866 } 867 } 868} 869