Http2Connection.java revision 16234:3b25414eb6af
1/*
2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package jdk.incubator.http;
27
28import java.io.IOException;
29import java.net.InetSocketAddress;
30import java.net.URI;
31import jdk.incubator.http.HttpConnection.Mode;
32import java.nio.ByteBuffer;
33import java.nio.charset.StandardCharsets;
34import java.util.HashMap;
35import java.util.Iterator;
36import java.util.LinkedList;
37import java.util.List;
38import java.util.Map;
39import java.util.concurrent.CompletableFuture;
40import java.util.ArrayList;
41import java.util.Collections;
42import java.util.Formatter;
43import java.util.concurrent.ConcurrentHashMap;
44import java.util.concurrent.CountDownLatch;
45import java.util.stream.Collectors;
46import jdk.incubator.http.internal.common.*;
47import jdk.incubator.http.internal.frame.*;
48import jdk.incubator.http.internal.hpack.Encoder;
49import jdk.incubator.http.internal.hpack.Decoder;
50import jdk.incubator.http.internal.hpack.DecodingCallback;
51
52import static jdk.incubator.http.internal.frame.SettingsFrame.*;
53
54
55/**
56 * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
57 * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
58 *
59 * Http2Connections belong to a Http2ClientImpl, (one of) which belongs
60 * to a HttpClientImpl.
61 *
62 * Creation cases:
63 * 1) upgraded HTTP/1.1 plain tcp connection
64 * 2) prior knowledge directly created plain tcp connection
65 * 3) directly created HTTP/2 SSL connection which uses ALPN.
66 *
67 * Sending is done by writing directly to underlying HttpConnection object which
68 * is operating in async mode. No flow control applies on output at this level
69 * and all writes are just executed as puts to an output Q belonging to HttpConnection
70 * Flow control is implemented by HTTP/2 protocol itself.
71 *
72 * Hpack header compression
73 * and outgoing stream creation is also done here, because these operations
74 * must be synchronized at the socket level. Stream objects send frames simply
75 * by placing them on the connection's output Queue. sendFrame() is called
76 * from a higher level (Stream) thread.
77 *
78 * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
79 * incoming Http2Frames, and directs them to the appropriate Stream.incoming()
80 * or handles them directly itself. This thread performs hpack decompression
81 * and incoming stream creation (Server push). Incoming frames destined for a
82 * stream are provided by calling Stream.incoming().
83 */
84class Http2Connection  {
85
86
87    /*
88     *  ByteBuffer pooling strategy for HTTP/2 protocol:
89     *
90     * In general there are 4 points where ByteBuffers are used:
91     *  - incoming/outgoing frames from/to ByteBufers plus incoming/outgoing encrypted data
92     *    in case of SSL connection.
93     *
94     * 1. Outgoing frames encoded to ByteBuffers.
95     *    Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc)
96     *    At this place no pools at all. All outgoing buffers should be collected by GC.
97     *
98     * 2. Incoming ByteBuffers (decoded to frames).
99     *    Here, total elimination of BB pool is not a good idea.
100     *    We don't know how many bytes we will receive through network.
101     * So here we allocate buffer of reasonable size. The following life of the BB:
102     * - If all frames decoded from the BB are other than DataFrame and HeaderFrame (and HeaderFrame subclasses)
103     *     BB is returned to pool,
104     * - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer obtained by slice() method.
105     *     Such BB is never returned to pool and will be GCed.
106     * - If we decoded HeadersFrame from the BB. Then header decoding is performed inside processFrame method and
107     *     the buffer could be release to pool.
108     *
109     * 3. SLL encrypted buffers. Here another pool was introduced and all net buffers are to/from the pool,
110     *    because of we can't predict size encrypted packets.
111     *
112     */
113
114
115    // A small class that allows to control the state of
116    // the connection preface. This is just a thin wrapper
117    // over a CountDownLatch.
118    private final class PrefaceController {
119        volatile boolean prefaceSent;
120        private final CountDownLatch latch = new CountDownLatch(1);
121
122        // This method returns immediately if the preface is sent,
123        // and blocks until the preface is sent if not.
124        // In the common case this where the preface is already sent
125        // this will cost not more than a volatile read.
126        void waitUntilPrefaceSent() {
127            if (!prefaceSent) {
128                try {
129                    // If the preface is not sent then await on the latch
130                    Log.logTrace("Waiting until connection preface is sent");
131                    latch.await();
132                    Log.logTrace("Preface sent: resuming reading");
133                    assert prefaceSent;
134                 } catch (InterruptedException e) {
135                    String msg = Utils.stackTrace(e);
136                    Log.logTrace(msg);
137                    shutdown(e);
138                }
139            }
140        }
141
142        // Mark that the connection preface is sent
143        void markPrefaceSent() {
144            assert !prefaceSent;
145            prefaceSent = true;
146            // Release the latch. If asyncReceive was scheduled it will
147            // be waiting for the release and will be woken up by this
148            // call. If not, then the semaphore will no longer be used after
149            // this.
150            latch.countDown();
151        }
152
153        boolean isPrefaceSent() {
154            return prefaceSent;
155        }
156    }
157
158    volatile boolean closed;
159
160    //-------------------------------------
161    final HttpConnection connection;
162    private final HttpClientImpl client;
163    private final Http2ClientImpl client2;
164    private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>();
165    private int nextstreamid;
166    private int nextPushStream = 2;
167    private final Encoder hpackOut;
168    private final Decoder hpackIn;
169    final SettingsFrame clientSettings;
170    private volatile SettingsFrame serverSettings;
171    private final String key; // for HttpClientImpl.connections map
172    private final FramesDecoder framesDecoder;
173    private final FramesEncoder framesEncoder = new FramesEncoder();
174
175    /**
176     * Send Window controller for both connection and stream windows.
177     * Each of this connection's Streams MUST use this controller.
178     */
179    private final WindowController windowController = new WindowController();
180    private final PrefaceController prefaceController = new PrefaceController();
181    final WindowUpdateSender windowUpdater;
182
183    static final int DEFAULT_FRAME_SIZE = 16 * 1024;
184
185
186    // TODO: need list of control frames from other threads
187    // that need to be sent
188
189    private Http2Connection(HttpConnection connection,
190                            Http2ClientImpl client2,
191                            int nextstreamid,
192                            String key) {
193        this.connection = connection;
194        this.client = client2.client();
195        this.client2 = client2;
196        this.nextstreamid = nextstreamid;
197        this.key = key;
198        this.clientSettings = this.client2.getClientSettings();
199        this.framesDecoder = new FramesDecoder(this::processFrame, clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE));
200        // serverSettings will be updated by server
201        this.serverSettings = SettingsFrame.getDefaultSettings();
202        this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
203        this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
204        this.windowUpdater = new ConnectionWindowUpdateSender(this, client.getReceiveBufferSize());
205    }
206        /**
207         * Case 1) Create from upgraded HTTP/1.1 connection.
208         * Is ready to use. Will not be SSL. exchange is the Exchange
209         * that initiated the connection, whose response will be delivered
210         * on a Stream.
211         */
212    Http2Connection(HttpConnection connection,
213                    Http2ClientImpl client2,
214                    Exchange<?> exchange,
215                    ByteBuffer initial)
216        throws IOException, InterruptedException
217    {
218        this(connection,
219                client2,
220                3, // stream 1 is registered during the upgrade
221                keyFor(connection));
222        assert !(connection instanceof SSLConnection);
223        Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
224
225        Stream<?> initialStream = createStream(exchange);
226        initialStream.registerStream(1);
227        windowController.registerStream(1, getInitialSendWindowSize());
228        initialStream.requestSent();
229        sendConnectionPreface();
230        // start reading and writing
231        // start reading
232        AsyncConnection asyncConn = (AsyncConnection)connection;
233        asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer);
234        connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility.
235        asyncReceive(ByteBufferReference.of(initial));
236        asyncConn.startReading();
237    }
238
239    // async style but completes immediately
240    static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
241                                                          Http2ClientImpl client2,
242                                                          Exchange<?> exchange,
243                                                          ByteBuffer initial) {
244        CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
245        try {
246            Http2Connection c = new Http2Connection(connection, client2, exchange, initial);
247            cf.complete(c);
248        } catch (IOException | InterruptedException e) {
249            cf.completeExceptionally(e);
250        }
251        return cf;
252    }
253
254    /**
255     * Cases 2) 3)
256     *
257     * request is request to be sent.
258     */
259    Http2Connection(HttpRequestImpl request, Http2ClientImpl h2client)
260        throws IOException, InterruptedException
261    {
262        this(HttpConnection.getConnection(request.getAddress(h2client.client()), h2client.client(), request, true),
263                h2client,
264                1,
265                keyFor(request.uri(), request.proxy(h2client.client())));
266        Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
267
268        connection.connect();
269        // start reading
270        AsyncConnection asyncConn = (AsyncConnection)connection;
271        asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer);
272        connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility.
273        asyncConn.startReading();
274        sendConnectionPreface();
275    }
276
277    static String keyFor(HttpConnection connection) {
278        boolean isProxy = connection.isProxied();
279        boolean isSecure = connection.isSecure();
280        InetSocketAddress addr = connection.address();
281
282        return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
283    }
284
285    static String keyFor(URI uri, InetSocketAddress proxy) {
286        boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
287        boolean isProxy = proxy != null;
288
289        String host;
290        int port;
291
292        if (isProxy) {
293            host = proxy.getHostString();
294            port = proxy.getPort();
295        } else {
296            host = uri.getHost();
297            port = uri.getPort();
298        }
299        return keyString(isSecure, isProxy, host, port);
300    }
301
302    // {C,S}:{H:P}:host:port
303    // C indicates clear text connection "http"
304    // S indicates secure "https"
305    // H indicates host (direct) connection
306    // P indicates proxy
307    // Eg: "S:H:foo.com:80"
308    static String keyString(boolean secure, boolean proxy, String host, int port) {
309        return (secure ? "S:" : "C:") + (proxy ? "P:" : "H:") + host + ":" + port;
310    }
311
312    String key() {
313        return this.key;
314    }
315
316    void putConnection() {
317        client2.putConnection(this);
318    }
319
320    private static String toHexdump1(ByteBuffer bb) {
321        bb.mark();
322        StringBuilder sb = new StringBuilder(512);
323        Formatter f = new Formatter(sb);
324
325        while (bb.hasRemaining()) {
326            int i =  Byte.toUnsignedInt(bb.get());
327            f.format("%02x:", i);
328        }
329        sb.deleteCharAt(sb.length()-1);
330        bb.reset();
331        return sb.toString();
332    }
333
334    private static String toHexdump(ByteBuffer bb) {
335        List<String> words = new ArrayList<>();
336        int i = 0;
337        bb.mark();
338        while (bb.hasRemaining()) {
339            if (i % 2 == 0) {
340                words.add("");
341            }
342            byte b = bb.get();
343            String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1);
344            words.set(i / 2, words.get(i / 2) + hex);
345            i++;
346        }
347        bb.reset();
348        return words.stream().collect(Collectors.joining(" "));
349    }
350
351    private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) {
352        boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
353
354        ByteBufferReference[] buffers = frame.getHeaderBlock();
355        for (int i = 0; i < buffers.length; i++) {
356            hpackIn.decode(buffers[i].get(), endOfHeaders && (i == buffers.length - 1), decoder);
357        }
358    }
359
360    int getInitialSendWindowSize() {
361        return serverSettings.getParameter(INITIAL_WINDOW_SIZE);
362    }
363
364    void close() {
365        GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes());
366        // TODO: set last stream. For now zero ok.
367        sendFrame(f);
368    }
369
370    private ByteBufferPool readBufferPool = new ByteBufferPool();
371
372    // provides buffer to read data (default size)
373    public ByteBufferReference getReadBuffer() {
374        return readBufferPool.get(getMaxReceiveFrameSize() + Http2Frame.FRAME_HEADER_SIZE);
375    }
376
377    private final Object readlock = new Object();
378
379    public void asyncReceive(ByteBufferReference buffer) {
380        // We don't need to read anything and
381        // we don't want to send anything back to the server
382        // until the connection preface has been sent.
383        // Therefore we're going to wait if needed before reading
384        // (and thus replying) to anything.
385        // Starting to reply to something (e.g send an ACK to a
386        // SettingsFrame sent by the server) before the connection
387        // preface is fully sent might result in the server
388        // sending a GOAWAY frame with 'invalid_preface'.
389        prefaceController.waitUntilPrefaceSent();
390        synchronized (readlock) {
391            assert prefaceController.isPrefaceSent();
392            try {
393                framesDecoder.decode(buffer);
394            } catch (Throwable e) {
395                String msg = Utils.stackTrace(e);
396                Log.logTrace(msg);
397                shutdown(e);
398            }
399        }
400    }
401
402
403    void shutdown(Throwable t) {
404        Log.logError(t);
405        closed = true;
406        client2.deleteConnection(this);
407        List<Stream<?>> c = new LinkedList<>(streams.values());
408        for (Stream<?> s : c) {
409            s.cancelImpl(t);
410        }
411        connection.close();
412    }
413
414    /**
415     * Handles stream 0 (common) frames that apply to whole connection and passes
416     * other stream specific frames to that Stream object.
417     *
418     * Invokes Stream.incoming() which is expected to process frame without
419     * blocking.
420     */
421    void processFrame(Http2Frame frame) throws IOException {
422        Log.logFrames(frame, "IN");
423        int streamid = frame.streamid();
424        if (frame instanceof MalformedFrame) {
425            Log.logError(((MalformedFrame) frame).getMessage());
426            if (streamid == 0) {
427                protocolError(((MalformedFrame) frame).getErrorCode());
428            } else {
429                resetStream(streamid, ((MalformedFrame) frame).getErrorCode());
430            }
431            return;
432        }
433        if (streamid == 0) {
434            handleConnectionFrame(frame);
435        } else {
436            if (frame instanceof SettingsFrame) {
437                // The stream identifier for a SETTINGS frame MUST be zero
438                protocolError(GoAwayFrame.PROTOCOL_ERROR);
439                return;
440            }
441
442            Stream<?> stream = getStream(streamid);
443            if (stream == null) {
444                // Should never receive a frame with unknown stream id
445
446                // To avoid looping, an endpoint MUST NOT send a RST_STREAM in
447                // response to a RST_STREAM frame.
448                if (!(frame instanceof ResetFrame)) {
449                    resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
450                }
451                return;
452            }
453            if (frame instanceof PushPromiseFrame) {
454                PushPromiseFrame pp = (PushPromiseFrame)frame;
455                handlePushPromise(stream, pp);
456            } else if (frame instanceof HeaderFrame) {
457                // decode headers (or continuation)
458                decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer());
459                stream.incoming(frame);
460            } else {
461                stream.incoming(frame);
462            }
463        }
464    }
465
466    private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
467        throws IOException
468    {
469        HttpRequestImpl parentReq = parent.request;
470        int promisedStreamid = pp.getPromisedStream();
471        if (promisedStreamid != nextPushStream) {
472            resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
473            return;
474        } else {
475            nextPushStream += 2;
476        }
477        HeaderDecoder decoder = new HeaderDecoder();
478        decodeHeaders(pp, decoder);
479        HttpHeadersImpl headers = decoder.headers();
480        HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
481        Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
482        Stream.PushedStream<?,T> pushStream = createPushStream(parent, pushExch);
483        pushExch.exchImpl = pushStream;
484        pushStream.registerStream(promisedStreamid);
485        parent.incoming_pushPromise(pushReq, pushStream);
486    }
487
488    private void handleConnectionFrame(Http2Frame frame)
489        throws IOException
490    {
491        switch (frame.type()) {
492          case SettingsFrame.TYPE:
493              handleSettings((SettingsFrame)frame);
494              break;
495          case PingFrame.TYPE:
496              handlePing((PingFrame)frame);
497              break;
498          case GoAwayFrame.TYPE:
499              handleGoAway((GoAwayFrame)frame);
500              break;
501          case WindowUpdateFrame.TYPE:
502              handleWindowUpdate((WindowUpdateFrame)frame);
503              break;
504          default:
505            protocolError(ErrorFrame.PROTOCOL_ERROR);
506        }
507    }
508
509    void resetStream(int streamid, int code) throws IOException {
510        Log.logError(
511            "Resetting stream {0,number,integer} with error code {1,number,integer}",
512            streamid, code);
513        ResetFrame frame = new ResetFrame(streamid, code);
514        sendFrame(frame);
515        closeStream(streamid);
516    }
517
518    void closeStream(int streamid) {
519        Stream<?> s = streams.remove(streamid);
520        // ## Remove s != null. It is a hack for delayed cancellation,reset
521        if (s != null && !(s instanceof Stream.PushedStream)) {
522            // Since PushStreams have no request body, then they have no
523            // corresponding entry in the window controller.
524            windowController.removeStream(streamid);
525        }
526    }
527    /**
528     * Increments this connection's send Window by the amount in the given frame.
529     */
530    private void handleWindowUpdate(WindowUpdateFrame f)
531        throws IOException
532    {
533        int amount = f.getUpdate();
534        if (amount <= 0) {
535            // ## temporarily disable to workaround a bug in Jetty where it
536            // ## sends Window updates with a 0 update value.
537            //protocolError(ErrorFrame.PROTOCOL_ERROR);
538        } else {
539            boolean success = windowController.increaseConnectionWindow(amount);
540            if (!success) {
541                protocolError(ErrorFrame.FLOW_CONTROL_ERROR);  // overflow
542            }
543        }
544    }
545
546    private void protocolError(int errorCode)
547        throws IOException
548    {
549        GoAwayFrame frame = new GoAwayFrame(0, errorCode);
550        sendFrame(frame);
551        shutdown(new IOException("protocol error"));
552    }
553
554    private void handleSettings(SettingsFrame frame)
555        throws IOException
556    {
557        assert frame.streamid() == 0;
558        if (!frame.getFlag(SettingsFrame.ACK)) {
559            int oldWindowSize = serverSettings.getParameter(INITIAL_WINDOW_SIZE);
560            int newWindowSize = frame.getParameter(INITIAL_WINDOW_SIZE);
561            int diff = newWindowSize - oldWindowSize;
562            if (diff != 0) {
563                windowController.adjustActiveStreams(diff);
564            }
565            serverSettings = frame;
566            sendFrame(new SettingsFrame(SettingsFrame.ACK));
567        }
568    }
569
570    private void handlePing(PingFrame frame)
571        throws IOException
572    {
573        frame.setFlag(PingFrame.ACK);
574        sendUnorderedFrame(frame);
575    }
576
577    private void handleGoAway(GoAwayFrame frame)
578        throws IOException
579    {
580        shutdown(new IOException(
581                        String.valueOf(connection.channel().getLocalAddress())
582                        +": GOAWAY received"));
583    }
584
585    /**
586     * Max frame size we are allowed to send
587     */
588    public int getMaxSendFrameSize() {
589        int param = serverSettings.getParameter(MAX_FRAME_SIZE);
590        if (param == -1) {
591            param = DEFAULT_FRAME_SIZE;
592        }
593        return param;
594    }
595
596    /**
597     * Max frame size we will receive
598     */
599    public int getMaxReceiveFrameSize() {
600        return clientSettings.getParameter(MAX_FRAME_SIZE);
601    }
602
603    // Not sure how useful this is.
604    public int getMaxHeadersSize() {
605        return serverSettings.getParameter(MAX_HEADER_LIST_SIZE);
606    }
607
608    private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
609
610    private static final byte[] PREFACE_BYTES =
611        CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
612
613    /**
614     * Sends Connection preface and Settings frame with current preferred
615     * values
616     */
617    private void sendConnectionPreface() throws IOException {
618        Log.logTrace("{0}: start sending connection preface to {1}",
619                     connection.channel().getLocalAddress(),
620                     connection.address());
621        SettingsFrame sf = client2.getClientSettings();
622        ByteBufferReference ref = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
623        Log.logFrames(sf, "OUT");
624        // send preface bytes and SettingsFrame together
625        connection.write(ref.get());
626
627        Log.logTrace("PREFACE_BYTES sent");
628        Log.logTrace("Settings Frame sent");
629
630        // send a Window update for the receive buffer we are using
631        // minus the initial 64 K specified in protocol
632        final int len = client2.client().getReceiveBufferSize() - (64 * 1024 - 1);
633        windowUpdater.sendWindowUpdate(len);
634        Log.logTrace("finished sending connection preface");
635        prefaceController.markPrefaceSent();
636    }
637
638    /**
639     * Returns an existing Stream with given id, or null if doesn't exist
640     */
641    @SuppressWarnings("unchecked")
642    <T> Stream<T> getStream(int streamid) {
643        return (Stream<T>)streams.get(streamid);
644    }
645
646    /**
647     * Creates Stream with given id.
648     */
649    <T> Stream<T> createStream(Exchange<T> exchange) {
650        Stream<T> stream = new Stream<>(client, this, exchange, windowController);
651        return stream;
652    }
653
654    <T> Stream.PushedStream<?,T> createPushStream(Stream<T> parent, Exchange<T> pushEx) {
655        PushGroup<?,T> pg = parent.exchange.getPushGroup();
656        return new Stream.PushedStream<>(pg, client, this, parent, pushEx);
657    }
658
659    <T> void putStream(Stream<T> stream, int streamid) {
660        streams.put(streamid, stream);
661    }
662
663    void deleteStream(int streamid) {
664        streams.remove(streamid);
665        windowController.removeStream(streamid);
666    }
667
668    /**
669     * Encode the headers into a List<ByteBuffer> and then create HEADERS
670     * and CONTINUATION frames from the list and return the List<Http2Frame>.
671     */
672    private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) {
673        List<ByteBufferReference> buffers = encodeHeadersImpl(
674                getMaxSendFrameSize(),
675                frame.getAttachment().getRequestPseudoHeaders(),
676                frame.getUserHeaders(),
677                frame.getSystemHeaders());
678
679        List<HeaderFrame> frames = new ArrayList<>(buffers.size());
680        Iterator<ByteBufferReference> bufIterator = buffers.iterator();
681        HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next());
682        frames.add(oframe);
683        while(bufIterator.hasNext()) {
684            oframe = new ContinuationFrame(frame.streamid(), bufIterator.next());
685            frames.add(oframe);
686        }
687        oframe.setFlag(HeaderFrame.END_HEADERS);
688        return frames;
689    }
690
691    // Dedicated cache for headers encoding ByteBuffer.
692    // There can be no concurrent access to this  buffer as all access to this buffer
693    // and its content happen within a single critical code block section protected
694    // by the sendLock. / (see sendFrame())
695    private ByteBufferPool headerEncodingPool = new ByteBufferPool();
696
697    private ByteBufferReference getHeaderBuffer(int maxFrameSize) {
698        ByteBufferReference ref = headerEncodingPool.get(maxFrameSize);
699        ref.get().limit(maxFrameSize);
700        return ref;
701    }
702
703    /*
704     * Encodes all the headers from the given HttpHeaders into the given List
705     * of buffers.
706     *
707     * From https://tools.ietf.org/html/rfc7540#section-8.1.2 :
708     *
709     *     ...Just as in HTTP/1.x, header field names are strings of ASCII
710     *     characters that are compared in a case-insensitive fashion.  However,
711     *     header field names MUST be converted to lowercase prior to their
712     *     encoding in HTTP/2...
713     */
714    private List<ByteBufferReference> encodeHeadersImpl(int maxFrameSize, HttpHeaders... headers) {
715        ByteBufferReference buffer = getHeaderBuffer(maxFrameSize);
716        List<ByteBufferReference> buffers = new ArrayList<>();
717        for(HttpHeaders header : headers) {
718            for (Map.Entry<String, List<String>> e : header.map().entrySet()) {
719                String lKey = e.getKey().toLowerCase();
720                List<String> values = e.getValue();
721                for (String value : values) {
722                    hpackOut.header(lKey, value);
723                    while (!hpackOut.encode(buffer.get())) {
724                        buffer.get().flip();
725                        buffers.add(buffer);
726                        buffer =  getHeaderBuffer(maxFrameSize);
727                    }
728                }
729            }
730        }
731        buffer.get().flip();
732        buffers.add(buffer);
733        return buffers;
734    }
735
736    private ByteBufferReference[] encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) {
737        oh.streamid(stream.streamid);
738        if (Log.headers()) {
739            StringBuilder sb = new StringBuilder("HEADERS FRAME (stream=");
740            sb.append(stream.streamid).append(")\n");
741            Log.dumpHeaders(sb, "    ", oh.getAttachment().getRequestPseudoHeaders());
742            Log.dumpHeaders(sb, "    ", oh.getSystemHeaders());
743            Log.dumpHeaders(sb, "    ", oh.getUserHeaders());
744            Log.logHeaders(sb.toString());
745        }
746        List<HeaderFrame> frames = encodeHeaders(oh);
747        return encodeFrames(frames);
748    }
749
750    private ByteBufferReference[] encodeFrames(List<HeaderFrame> frames) {
751        if (Log.frames()) {
752            frames.forEach(f -> Log.logFrames(f, "OUT"));
753        }
754        return framesEncoder.encodeFrames(frames);
755    }
756
757    static Throwable getExceptionFrom(CompletableFuture<?> cf) {
758        try {
759            cf.get();
760            return null;
761        } catch (Throwable e) {
762            if (e.getCause() != null) {
763                return e.getCause();
764            } else {
765                return e;
766            }
767        }
768    }
769
770    private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
771        Stream<?> stream = oh.getAttachment();
772        int streamid = nextstreamid;
773        nextstreamid += 2;
774        stream.registerStream(streamid);
775        // set outgoing window here. This allows thread sending
776        // body to proceed.
777        windowController.registerStream(streamid, getInitialSendWindowSize());
778        return stream;
779    }
780
781    private final Object sendlock = new Object();
782
783    void sendFrame(Http2Frame frame) {
784        try {
785            synchronized (sendlock) {
786                if (frame instanceof OutgoingHeaders) {
787                    @SuppressWarnings("unchecked")
788                    OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
789                    Stream<?> stream = registerNewStream(oh);
790                    // provide protection from inserting unordered frames between Headers and Continuation
791                    connection.writeAsync(encodeHeaders(oh, stream));
792                } else {
793                    connection.writeAsync(encodeFrame(frame));
794                }
795            }
796            connection.flushAsync();
797        } catch (IOException e) {
798            if (!closed) {
799                Log.logError(e);
800                shutdown(e);
801            }
802        }
803    }
804
805    private ByteBufferReference[] encodeFrame(Http2Frame frame) {
806        Log.logFrames(frame, "OUT");
807        return framesEncoder.encodeFrame(frame);
808    }
809
810    void sendDataFrame(DataFrame frame) {
811        try {
812            connection.writeAsync(encodeFrame(frame));
813            connection.flushAsync();
814        } catch (IOException e) {
815            if (!closed) {
816                Log.logError(e);
817                shutdown(e);
818            }
819        }
820    }
821
822    /*
823     * Direct call of the method bypasses synchronization on "sendlock" and
824     * allowed only of control frames: WindowUpdateFrame, PingFrame and etc.
825     * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame.
826     */
827    void sendUnorderedFrame(Http2Frame frame) {
828        try {
829            connection.writeAsyncUnordered(encodeFrame(frame));
830            connection.flushAsync();
831        } catch (IOException e) {
832            if (!closed) {
833                Log.logError(e);
834                shutdown(e);
835            }
836        }
837    }
838
839    static class HeaderDecoder implements DecodingCallback {
840        HttpHeadersImpl headers;
841
842        HeaderDecoder() {
843            this.headers = new HttpHeadersImpl();
844        }
845
846        @Override
847        public void onDecoded(CharSequence name, CharSequence value) {
848            headers.addHeader(name.toString(), value.toString());
849        }
850
851        HttpHeadersImpl headers() {
852            return headers;
853        }
854    }
855
856    static final class ConnectionWindowUpdateSender extends WindowUpdateSender {
857
858        public ConnectionWindowUpdateSender(Http2Connection connection,
859                                            int initialWindowSize) {
860            super(connection, initialWindowSize);
861        }
862
863        @Override
864        int getStreamId() {
865            return 0;
866        }
867    }
868}
869