Stream.java revision 17210:ea0146845b79
1/*
2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package jdk.incubator.http;
27
28import java.io.IOException;
29import java.net.URI;
30import java.nio.ByteBuffer;
31import java.util.ArrayList;
32import java.util.List;
33import java.util.Optional;
34import java.util.concurrent.CompletableFuture;
35import java.util.concurrent.CompletionException;
36import java.util.concurrent.ExecutionException;
37import java.util.concurrent.Executor;
38import java.util.concurrent.Flow;
39import java.util.concurrent.Flow.Subscription;
40import java.util.concurrent.TimeUnit;
41import java.util.concurrent.TimeoutException;
42import java.util.function.Consumer;
43
44import jdk.incubator.http.internal.common.*;
45import jdk.incubator.http.internal.frame.*;
46import jdk.incubator.http.internal.hpack.DecodingCallback;
47
48/**
49 * Http/2 Stream handling.
50 *
51 * REQUESTS
52 *
53 * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
54 *
55 * sendRequest() -- sendHeadersOnly() + sendBody()
56 *
57 * sendBody() -- in calling thread: obeys all flow control (so may block)
58 *               obtains data from request body processor and places on connection
59 *               outbound Q.
60 *
61 * sendBodyAsync() -- calls sendBody() in an executor thread.
62 *
63 * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
64 *
65 * sendRequestAsync() -- calls sendRequest() in an executor thread
66 *
67 * RESPONSES
68 *
69 * Multiple responses can be received per request. Responses are queued up on
70 * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
71 * with the next response
72 *
73 * getResponseAsync() -- queries list of response CFs and returns first one
74 *               if one exists. Otherwise, creates one and adds it to list
75 *               and returns it. Completion is achieved through the
76 *               incoming() upcall from connection reader thread.
77 *
78 * getResponse() -- calls getResponseAsync() and waits for CF to complete
79 *
80 * responseBody() -- in calling thread: blocks for incoming DATA frames on
81 *               stream inputQ. Obeys remote and local flow control so may block.
82 *               Calls user response body processor with data buffers.
83 *
84 * responseBodyAsync() -- calls responseBody() in an executor thread.
85 *
86 * incoming() -- entry point called from connection reader thread. Frames are
87 *               either handled immediately without blocking or for data frames
88 *               placed on the stream's inputQ which is consumed by the stream's
89 *               reader thread.
90 *
91 * PushedStream sub class
92 * ======================
93 * Sending side methods are not used because the request comes from a PUSH_PROMISE
94 * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
95 * is created. PushedStream does not use responseCF list as there can be only
96 * one response. The CF is created when the object created and when the response
97 * HEADERS frame is received the object is completed.
98 */
99class Stream<T> extends ExchangeImpl<T> {
100
101    final AsyncDataReadQueue inputQ = new AsyncDataReadQueue();
102
103    /**
104     * This stream's identifier. Assigned lazily by the HTTP2Connection before
105     * the stream's first frame is sent.
106     */
107    protected volatile int streamid;
108
109    long responseContentLen = -1;
110    long responseBytesProcessed = 0;
111    long requestContentLen;
112
113    final Http2Connection connection;
114    HttpClientImpl client;
115    final HttpRequestImpl request;
116    final DecodingCallback rspHeadersConsumer;
117    HttpHeadersImpl responseHeaders;
118    final HttpHeadersImpl requestHeaders;
119    final HttpHeadersImpl requestPseudoHeaders;
120    HttpResponse.BodyProcessor<T> responseProcessor;
121    final HttpRequest.BodyProcessor requestProcessor;
122    volatile int responseCode;
123    volatile Response response;
124    volatile CompletableFuture<Response> responseCF;
125    final AbstractPushPublisher<ByteBuffer> publisher;
126    final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
127
128    /** True if END_STREAM has been seen in a frame received on this stream. */
129    private volatile boolean remotelyClosed;
130    private volatile boolean closed;
131    private volatile boolean endStreamSent;
132
133    // state flags
134    boolean requestSent, responseReceived, responseHeadersReceived;
135
136    /**
137     * A reference to this Stream's connection Send Window controller. The
138     * stream MUST acquire the appropriate amount of Send Window before
139     * sending any data. Will be null for PushStreams, as they cannot send data.
140     */
141    private final WindowController windowController;
142    private final WindowUpdateSender windowUpdater;
143
144    @Override
145    HttpConnection connection() {
146        return connection.connection;
147    }
148
149    @Override
150    CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
151                                       boolean returnConnectionToPool,
152                                       Executor executor)
153    {
154        Log.logTrace("Reading body on stream {0}", streamid);
155        responseProcessor = handler.apply(responseCode, responseHeaders);
156        setClientForResponse(responseProcessor);
157        publisher.subscribe(responseProcessor);
158        CompletableFuture<T> cf = receiveData(executor);
159
160        PushGroup<?,?> pg = exchange.getPushGroup();
161        if (pg != null) {
162            // if an error occurs make sure it is recorded in the PushGroup
163            cf = cf.whenComplete((t,e) -> pg.pushError(e));
164        }
165        return cf;
166    }
167
168    @Override
169    T readBody(HttpResponse.BodyHandler<T> handler, boolean returnConnectionToPool)
170        throws IOException
171    {
172        CompletableFuture<T> cf = readBodyAsync(handler,
173                                                returnConnectionToPool,
174                                                null);
175        try {
176            return cf.join();
177        } catch (CompletionException e) {
178            throw Utils.getIOException(e);
179        }
180    }
181
182    @Override
183    public String toString() {
184        StringBuilder sb = new StringBuilder();
185        sb.append("streamid: ")
186                .append(streamid);
187        return sb.toString();
188    }
189
190    private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException {
191        if (frame instanceof ResetFrame) {
192            handleReset((ResetFrame) frame);
193            return true;
194        } else if (!(frame instanceof DataFrame)) {
195            assert false;
196            return true;
197        }
198        DataFrame df = (DataFrame) frame;
199        // RFC 7540 6.1:
200        // The entire DATA frame payload is included in flow control,
201        // including the Pad Length and Padding fields if present
202        int len = df.payloadLength();
203        ByteBufferReference[] buffers = df.getData();
204        for (ByteBufferReference b : buffers) {
205            ByteBuffer buf = b.get();
206            if (buf.hasRemaining()) {
207                publisher.acceptData(Optional.of(buf));
208            }
209        }
210        connection.windowUpdater.update(len);
211        if (df.getFlag(DataFrame.END_STREAM)) {
212            setEndStreamReceived();
213            publisher.acceptData(Optional.empty());
214            return false;
215        }
216        // Don't send window update on a stream which is
217        // closed or half closed.
218        windowUpdater.update(len);
219        return true;
220    }
221
222    // pushes entire response body into response processor
223    // blocking when required by local or remote flow control
224    CompletableFuture<T> receiveData(Executor executor) {
225        CompletableFuture<T> cf = responseProcessor
226                .getBody()
227                .toCompletableFuture();
228        Consumer<Throwable> onError = e -> {
229            Log.logTrace("receiveData: {0}", e.toString());
230            e.printStackTrace();
231            cf.completeExceptionally(e);
232            publisher.acceptError(e);
233        };
234        if (executor == null) {
235            inputQ.blockingReceive(this::receiveDataFrame, onError);
236        } else {
237            inputQ.asyncReceive(executor, this::receiveDataFrame, onError);
238        }
239        return cf;
240    }
241
242    @Override
243    void sendBody() throws IOException {
244        try {
245            sendBodyImpl().join();
246        } catch (CompletionException e) {
247            throw Utils.getIOException(e);
248        }
249    }
250
251    CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
252        return sendBodyImpl().thenApply( v -> this);
253    }
254
255    @SuppressWarnings("unchecked")
256    Stream(HttpClientImpl client,
257           Http2Connection connection,
258           Exchange<T> e,
259           WindowController windowController)
260    {
261        super(e);
262        this.client = client;
263        this.connection = connection;
264        this.windowController = windowController;
265        this.request = e.request();
266        this.requestProcessor = request.requestProcessor;
267        responseHeaders = new HttpHeadersImpl();
268        requestHeaders = new HttpHeadersImpl();
269        rspHeadersConsumer = (name, value) -> {
270            responseHeaders.addHeader(name.toString(), value.toString());
271            if (Log.headers() && Log.trace()) {
272                Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
273                             streamid, name, value);
274            }
275        };
276        this.requestPseudoHeaders = new HttpHeadersImpl();
277        // NEW
278        this.publisher = new BlockingPushPublisher<>();
279        this.windowUpdater = new StreamWindowUpdateSender(connection);
280    }
281
282    /**
283     * Entry point from Http2Connection reader thread.
284     *
285     * Data frames will be removed by response body thread.
286     */
287    void incoming(Http2Frame frame) throws IOException {
288        if ((frame instanceof HeaderFrame)) {
289            HeaderFrame hframe = (HeaderFrame)frame;
290            if (hframe.endHeaders()) {
291                Log.logTrace("handling response (streamid={0})", streamid);
292                handleResponse();
293                if (hframe.getFlag(HeaderFrame.END_STREAM)) {
294                    inputQ.put(new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]));
295                }
296            }
297        } else if (frame instanceof DataFrame) {
298            inputQ.put(frame);
299        } else {
300            otherFrame(frame);
301        }
302    }
303
304    void otherFrame(Http2Frame frame) throws IOException {
305        switch (frame.type()) {
306            case WindowUpdateFrame.TYPE:
307                incoming_windowUpdate((WindowUpdateFrame) frame);
308                break;
309            case ResetFrame.TYPE:
310                incoming_reset((ResetFrame) frame);
311                break;
312            case PriorityFrame.TYPE:
313                incoming_priority((PriorityFrame) frame);
314                break;
315            default:
316                String msg = "Unexpected frame: " + frame.toString();
317                throw new IOException(msg);
318        }
319    }
320
321    // The Hpack decoder decodes into one of these consumers of name,value pairs
322
323    DecodingCallback rspHeadersConsumer() {
324        return rspHeadersConsumer;
325    }
326
327    protected void handleResponse() throws IOException {
328        synchronized(this) {
329            responseHeadersReceived = true;
330        }
331        HttpConnection c = connection.connection; // TODO: improve
332        responseCode = (int)responseHeaders
333                .firstValueAsLong(":status")
334                .orElseThrow(() -> new IOException("no statuscode in response"));
335
336        response = new Response(
337                request, exchange, responseHeaders,
338                responseCode, HttpClient.Version.HTTP_2);
339
340        this.responseContentLen = responseHeaders
341                .firstValueAsLong("content-length")
342                .orElse(-1L);
343
344        if (Log.headers()) {
345            StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
346            Log.dumpHeaders(sb, "    ", responseHeaders);
347            Log.logHeaders(sb.toString());
348        }
349
350        completeResponse(response);
351    }
352
353    void incoming_reset(ResetFrame frame) throws IOException {
354        Log.logTrace("Received RST_STREAM on stream {0}", streamid);
355        if (endStreamReceived()) {
356            Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
357        } else if (closed) {
358            Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
359        } else {
360            boolean pushedToQueue = false;
361            synchronized(this) {
362                // if the response headers are not yet
363                // received, or the inputQueue is closed, handle reset directly.
364                // Otherwise, put it in the input queue in order to read all
365                // pending data frames first. Indeed, a server may send
366                // RST_STREAM after sending END_STREAM, in which case we should
367                // ignore it. However, we won't know if we have received END_STREAM
368                // or not until all pending data frames are read.
369                // Because the inputQ will not be read until the response
370                // headers are received, and because response headers won't be
371                // sent if the server sent RST_STREAM, then we must handle
372                // reset here directly unless responseHeadersReceived is true.
373                pushedToQueue = !closed && responseHeadersReceived && inputQ.tryPut(frame);
374            }
375            if (!pushedToQueue) {
376                // RST_STREAM was not pushed to the queue: handle it.
377                try {
378                    handleReset(frame);
379                } catch (IOException io) {
380                    completeResponseExceptionally(io);
381                }
382            } else {
383                // RST_STREAM was pushed to the queue. It will be handled by
384                // asyncReceive after all pending data frames have been
385                // processed.
386                Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
387            }
388        }
389    }
390
391    void handleReset(ResetFrame frame) throws IOException {
392        Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
393        if (!closed) {
394            close();
395            int error = frame.getErrorCode();
396            throw new IOException(ErrorFrame.stringForCode(error));
397        } else {
398            Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
399        }
400    }
401
402    void incoming_priority(PriorityFrame frame) {
403        // TODO: implement priority
404        throw new UnsupportedOperationException("Not implemented");
405    }
406
407    private void incoming_windowUpdate(WindowUpdateFrame frame)
408        throws IOException
409    {
410        int amount = frame.getUpdate();
411        if (amount <= 0) {
412            Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n",
413                         streamid, streamid, amount);
414            connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
415        } else {
416            assert streamid != 0;
417            boolean success = windowController.increaseStreamWindow(amount, streamid);
418            if (!success) {  // overflow
419                connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
420            }
421        }
422    }
423
424    void incoming_pushPromise(HttpRequestImpl pushReq,
425                              PushedStream<?,T> pushStream)
426        throws IOException
427    {
428        if (Log.requests()) {
429            Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
430        }
431        PushGroup<?,T> pushGroup = exchange.getPushGroup();
432        if (pushGroup == null || pushGroup.noMorePushes()) {
433            cancelImpl(new IllegalStateException("unexpected push promise"
434                + " on stream " + streamid));
435        }
436
437        HttpResponse.MultiProcessor<?,T> proc = pushGroup.processor();
438
439        CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
440
441        Optional<HttpResponse.BodyHandler<T>> bpOpt = proc.onRequest(
442                pushReq);
443
444        if (!bpOpt.isPresent()) {
445            IOException ex = new IOException("Stream "
446                 + streamid + " cancelled by user");
447            if (Log.trace()) {
448                Log.logTrace("No body processor for {0}: {1}", pushReq,
449                            ex.getMessage());
450            }
451            pushStream.cancelImpl(ex);
452            cf.completeExceptionally(ex);
453            return;
454        }
455
456        pushGroup.addPush();
457        pushStream.requestSent();
458        pushStream.setPushHandler(bpOpt.get());
459        // setup housekeeping for when the push is received
460        // TODO: deal with ignoring of CF anti-pattern
461        cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
462            if (Log.trace()) {
463                Log.logTrace("Push completed on stream {0} for {1}{2}",
464                             pushStream.streamid, resp,
465                             ((t==null) ? "": " with exception " + t));
466            }
467            if (t != null) {
468                pushGroup.pushError(t);
469                proc.onError(pushReq, t);
470            } else {
471                proc.onResponse(resp);
472            }
473            pushGroup.pushCompleted();
474        });
475
476    }
477
478    private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {
479        HttpHeadersImpl h = request.getSystemHeaders();
480        if (contentLength > 0) {
481            h.setHeader("content-length", Long.toString(contentLength));
482        }
483        setPseudoHeaderFields();
484        OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(h, request.getUserHeaders(), this);
485        if (contentLength == 0) {
486            f.setFlag(HeadersFrame.END_STREAM);
487            endStreamSent = true;
488        }
489        return f;
490    }
491
492    private void setPseudoHeaderFields() {
493        HttpHeadersImpl hdrs = requestPseudoHeaders;
494        String method = request.method();
495        hdrs.setHeader(":method", method);
496        URI uri = request.uri();
497        hdrs.setHeader(":scheme", uri.getScheme());
498        // TODO: userinfo deprecated. Needs to be removed
499        hdrs.setHeader(":authority", uri.getAuthority());
500        // TODO: ensure header names beginning with : not in user headers
501        String query = uri.getQuery();
502        String path = uri.getPath();
503        if (path == null || path.isEmpty()) {
504            if (method.equalsIgnoreCase("OPTIONS")) {
505                path = "*";
506            } else {
507                path = "/";
508            }
509        }
510        if (query != null) {
511            path += "?" + query;
512        }
513        hdrs.setHeader(":path", path);
514    }
515
516    HttpHeadersImpl getRequestPseudoHeaders() {
517        return requestPseudoHeaders;
518    }
519
520    @Override
521    Response getResponse() throws IOException {
522        try {
523            if (request.duration() != null) {
524                Log.logTrace("Waiting for response (streamid={0}, timeout={1}ms)",
525                             streamid,
526                             request.duration().toMillis());
527                return getResponseAsync(null).get(
528                        request.duration().toMillis(), TimeUnit.MILLISECONDS);
529            } else {
530                Log.logTrace("Waiting for response (streamid={0})", streamid);
531                return getResponseAsync(null).join();
532            }
533        } catch (TimeoutException e) {
534            Log.logTrace("Response timeout (streamid={0})", streamid);
535            throw new HttpTimeoutException("Response timed out");
536        } catch (InterruptedException | ExecutionException | CompletionException e) {
537            Throwable t = e.getCause();
538            Log.logTrace("Response failed (streamid={0}): {1}", streamid, t);
539            if (t instanceof IOException) {
540                throw (IOException)t;
541            }
542            throw new IOException(e);
543        } finally {
544            Log.logTrace("Got response or failed (streamid={0})", streamid);
545        }
546    }
547
548    /** Sets endStreamReceived. Should be called only once. */
549    void setEndStreamReceived() {
550        assert remotelyClosed == false: "Unexpected endStream already set";
551        remotelyClosed = true;
552        responseReceived();
553    }
554
555    /** Tells whether, or not, the END_STREAM Flag has been seen in any frame
556     *  received on this stream. */
557    private boolean endStreamReceived() {
558        return remotelyClosed;
559    }
560
561    @Override
562    void sendHeadersOnly() throws IOException, InterruptedException {
563        if (Log.requests() && request != null) {
564            Log.logRequest(request.toString());
565        }
566        requestContentLen = requestProcessor.contentLength();
567        OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
568        connection.sendFrame(f);
569    }
570
571    void registerStream(int id) {
572        this.streamid = id;
573        connection.putStream(this, streamid);
574    }
575
576    class RequestSubscriber
577        extends RequestProcessors.ProcessorBase
578        implements Flow.Subscriber<ByteBuffer>
579    {
580        // can be < 0 if the actual length is not known.
581        private volatile long remainingContentLength;
582        private volatile Subscription subscription;
583
584        RequestSubscriber(long contentLen) {
585            this.remainingContentLength = contentLen;
586        }
587
588        @Override
589        public void onSubscribe(Flow.Subscription subscription) {
590            if (this.subscription != null) {
591                throw new IllegalStateException();
592            }
593            this.subscription = subscription;
594            subscription.request(1);
595        }
596
597        @Override
598        public void onNext(ByteBuffer item) {
599            if (requestBodyCF.isDone()) {
600                throw new IllegalStateException();
601            }
602
603            try {
604                while (item.hasRemaining()) {
605                    assert !endStreamSent : "internal error, send data after END_STREAM flag";
606                    DataFrame df = getDataFrame(item);
607                    if (remainingContentLength > 0) {
608                        remainingContentLength -= df.getDataLength();
609                        assert remainingContentLength >= 0;
610                        if (remainingContentLength == 0) {
611                            df.setFlag(DataFrame.END_STREAM);
612                            endStreamSent = true;
613                        }
614                    }
615                    connection.sendDataFrame(df);
616                }
617                subscription.request(1);
618            } catch (InterruptedException ex) {
619                subscription.cancel();
620                requestBodyCF.completeExceptionally(ex);
621            }
622        }
623
624        @Override
625        public void onError(Throwable throwable) {
626            if (requestBodyCF.isDone()) {
627                return;
628            }
629            subscription.cancel();
630            requestBodyCF.completeExceptionally(throwable);
631        }
632
633        @Override
634        public void onComplete() {
635            assert endStreamSent || remainingContentLength < 0;
636            try {
637                if (!endStreamSent) {
638                    endStreamSent = true;
639                    connection.sendDataFrame(getEmptyEndStreamDataFrame());
640                }
641                requestBodyCF.complete(null);
642            } catch (InterruptedException ex) {
643                requestBodyCF.completeExceptionally(ex);
644            }
645        }
646    }
647
648    DataFrame getDataFrame(ByteBuffer buffer) throws InterruptedException {
649        int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
650        // blocks waiting for stream send window, if exhausted
651        int actualAmount = windowController.tryAcquire(requestAmount, streamid);
652        ByteBuffer outBuf = Utils.slice(buffer,  actualAmount);
653        DataFrame df = new DataFrame(streamid, 0 , ByteBufferReference.of(outBuf));
654        return df;
655    }
656
657    private DataFrame getEmptyEndStreamDataFrame() throws InterruptedException {
658        return new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]);
659    }
660
661    /**
662     * A List of responses relating to this stream. Normally there is only
663     * one response, but intermediate responses like 100 are allowed
664     * and must be passed up to higher level before continuing. Deals with races
665     * such as if responses are returned before the CFs get created by
666     * getResponseAsync()
667     */
668
669    final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
670
671    @Override
672    CompletableFuture<Response> getResponseAsync(Executor executor) {
673        CompletableFuture<Response> cf = null;
674        // The code below deals with race condition that can be caused when
675        // completeResponse() is being called before getResponseAsync()
676        synchronized (response_cfs) {
677            if (!response_cfs.isEmpty()) {
678                // This CompletableFuture was created by completeResponse().
679                // it will be already completed.
680                cf = response_cfs.remove(0);
681                // if we find a cf here it should be already completed.
682                // finding a non completed cf should not happen. just assert it.
683                assert cf.isDone() : "Removing uncompleted response: could cause code to hang!";
684            } else {
685                // getResponseAsync() is called first. Create a CompletableFuture
686                // that will be completed by completeResponse() when
687                // completeResponse() is called.
688                cf = new MinimalFuture<>();
689                response_cfs.add(cf);
690            }
691        }
692        if (executor != null && !cf.isDone()) {
693            // protect from executing later chain of CompletableFuture operations from SelectorManager thread
694            cf = cf.thenApplyAsync(r -> r, executor);
695        }
696        Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
697        PushGroup<?,?> pg = exchange.getPushGroup();
698        if (pg != null) {
699            // if an error occurs make sure it is recorded in the PushGroup
700            cf = cf.whenComplete((t,e) -> pg.pushError(e));
701        }
702        return cf;
703    }
704
705    /**
706     * Completes the first uncompleted CF on list, and removes it. If there is no
707     * uncompleted CF then creates one (completes it) and adds to list
708     */
709    void completeResponse(Response resp) {
710        synchronized (response_cfs) {
711            CompletableFuture<Response> cf;
712            int cfs_len = response_cfs.size();
713            for (int i=0; i<cfs_len; i++) {
714                cf = response_cfs.get(i);
715                if (!cf.isDone()) {
716                    Log.logTrace("Completing response (streamid={0}): {1}",
717                                 streamid, cf);
718                    cf.complete(resp);
719                    response_cfs.remove(cf);
720                    return;
721                } // else we found the previous response: just leave it alone.
722            }
723            cf = MinimalFuture.completedFuture(resp);
724            Log.logTrace("Created completed future (streamid={0}): {1}",
725                         streamid, cf);
726            response_cfs.add(cf);
727        }
728    }
729
730    // methods to update state and remove stream when finished
731
732    synchronized void requestSent() {
733        requestSent = true;
734        if (responseReceived) {
735            close();
736        }
737    }
738
739    final synchronized boolean isResponseReceived() {
740        return responseReceived;
741    }
742
743    synchronized void responseReceived() {
744        responseReceived = true;
745        if (requestSent) {
746            close();
747        }
748    }
749
750    /**
751     * same as above but for errors
752     */
753    void completeResponseExceptionally(Throwable t) {
754        synchronized (response_cfs) {
755            // use index to avoid ConcurrentModificationException
756            // caused by removing the CF from within the loop.
757            for (int i = 0; i < response_cfs.size(); i++) {
758                CompletableFuture<Response> cf = response_cfs.get(i);
759                if (!cf.isDone()) {
760                    cf.completeExceptionally(t);
761                    response_cfs.remove(i);
762                    return;
763                }
764            }
765            response_cfs.add(MinimalFuture.failedFuture(t));
766        }
767    }
768
769    CompletableFuture<Void> sendBodyImpl() {
770        RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
771        subscriber.setClient(client);
772        requestProcessor.subscribe(subscriber);
773        requestBodyCF.whenComplete((v,t) -> requestSent());
774        return requestBodyCF;
775    }
776
777    @Override
778    void cancel() {
779        cancel(new IOException("Stream " + streamid + " cancelled"));
780    }
781
782    @Override
783    void cancel(IOException cause) {
784        cancelImpl(cause);
785    }
786
787    // This method sends a RST_STREAM frame
788    void cancelImpl(Throwable e) {
789        if (Log.trace()) {
790            Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
791        }
792        boolean closing;
793        if (closing = !closed) { // assigning closing to !closed
794            synchronized (this) {
795                if (closing = !closed) { // assigning closing to !closed
796                    closed=true;
797                }
798            }
799        }
800        if (closing) { // true if the stream has not been closed yet
801            inputQ.close();
802        }
803        completeResponseExceptionally(e);
804        try {
805            // will send a RST_STREAM frame
806            if (streamid != 0) {
807                connection.resetStream(streamid, ResetFrame.CANCEL);
808            }
809        } catch (IOException ex) {
810            Log.logError(ex);
811        }
812    }
813
814    // This method doesn't send any frame
815    void close() {
816        if (closed) return;
817        synchronized(this) {
818            if (closed) return;
819            closed = true;
820        }
821        Log.logTrace("Closing stream {0}", streamid);
822        inputQ.close();
823        connection.closeStream(streamid);
824        Log.logTrace("Stream {0} closed", streamid);
825    }
826
827    static class PushedStream<U,T> extends Stream<T> {
828        final PushGroup<U,T> pushGroup;
829        private final Stream<T> parent;      // used by server push streams
830        // push streams need the response CF allocated up front as it is
831        // given directly to user via the multi handler callback function.
832        final CompletableFuture<Response> pushCF;
833        final CompletableFuture<HttpResponse<T>> responseCF;
834        final HttpRequestImpl pushReq;
835        HttpResponse.BodyHandler<T> pushHandler;
836
837        PushedStream(PushGroup<U,T> pushGroup, HttpClientImpl client,
838                Http2Connection connection, Stream<T> parent,
839                Exchange<T> pushReq) {
840            // ## no request body possible, null window controller
841            super(client, connection, pushReq, null);
842            this.pushGroup = pushGroup;
843            this.pushReq = pushReq.request();
844            this.pushCF = new MinimalFuture<>();
845            this.responseCF = new MinimalFuture<>();
846            this.parent = parent;
847        }
848
849        CompletableFuture<HttpResponse<T>> responseCF() {
850            return responseCF;
851        }
852
853        synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
854            this.pushHandler = pushHandler;
855        }
856
857        synchronized HttpResponse.BodyHandler<T> getPushHandler() {
858            // ignored parameters to function can be used as BodyHandler
859            return this.pushHandler;
860        }
861
862        // Following methods call the super class but in case of
863        // error record it in the PushGroup. The error method is called
864        // with a null value when no error occurred (is a no-op)
865        @Override
866        CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
867            return super.sendBodyAsync()
868                        .whenComplete((ExchangeImpl<T> v, Throwable t) -> pushGroup.pushError(t));
869        }
870
871        @Override
872        CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
873            return super.sendHeadersAsync()
874                        .whenComplete((ExchangeImpl<T> ex, Throwable t) -> pushGroup.pushError(t));
875        }
876
877        @Override
878        CompletableFuture<Response> getResponseAsync(Executor executor) {
879            CompletableFuture<Response> cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
880            if(executor!=null && !cf.isDone()) {
881                cf  = cf.thenApplyAsync( r -> r, executor);
882            }
883            return cf;
884        }
885
886        @Override
887        CompletableFuture<T> readBodyAsync(
888                HttpResponse.BodyHandler<T> handler,
889                boolean returnConnectionToPool,
890                Executor executor)
891        {
892            return super.readBodyAsync(handler, returnConnectionToPool, executor)
893                        .whenComplete((v, t) -> pushGroup.pushError(t));
894        }
895
896        @Override
897        void completeResponse(Response r) {
898            HttpResponseImpl.logResponse(r);
899            pushCF.complete(r); // not strictly required for push API
900            // start reading the body using the obtained BodyProcessor
901            CompletableFuture<Void> start = new MinimalFuture<>();
902            start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
903                .whenComplete((T body, Throwable t) -> {
904                    if (t != null) {
905                        responseCF.completeExceptionally(t);
906                    } else {
907                        HttpResponseImpl<T> response = new HttpResponseImpl<>(r.request, r, body, getExchange());
908                        responseCF.complete(response);
909                    }
910                });
911            start.completeAsync(() -> null, getExchange().executor());
912        }
913
914        @Override
915        void completeResponseExceptionally(Throwable t) {
916            pushCF.completeExceptionally(t);
917        }
918
919        @Override
920        synchronized void responseReceived() {
921            super.responseReceived();
922        }
923
924        // create and return the PushResponseImpl
925        @Override
926        protected void handleResponse() {
927            HttpConnection c = connection.connection; // TODO: improve
928            responseCode = (int)responseHeaders
929                .firstValueAsLong(":status")
930                .orElse(-1);
931
932            if (responseCode == -1) {
933                completeResponseExceptionally(new IOException("No status code"));
934            }
935
936            this.response = new Response(
937                pushReq, exchange, responseHeaders,
938                responseCode, HttpClient.Version.HTTP_2);
939
940            this.responseContentLen = responseHeaders
941                .firstValueAsLong("content-length")
942                .orElse(-1L);
943
944            if (Log.headers()) {
945                StringBuilder sb = new StringBuilder("RESPONSE HEADERS");
946                sb.append(" (streamid=").append(streamid).append("): ");
947                Log.dumpHeaders(sb, "    ", responseHeaders);
948                Log.logHeaders(sb.toString());
949            }
950
951            // different implementations for normal streams and pushed streams
952            completeResponse(response);
953        }
954    }
955
956    final class StreamWindowUpdateSender extends WindowUpdateSender {
957
958        StreamWindowUpdateSender(Http2Connection connection) {
959            super(connection);
960        }
961
962        @Override
963        int getStreamId() {
964            return streamid;
965        }
966    }
967
968}
969