Stream.java revision 17210:ea0146845b79
1/* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26package jdk.incubator.http; 27 28import java.io.IOException; 29import java.net.URI; 30import java.nio.ByteBuffer; 31import java.util.ArrayList; 32import java.util.List; 33import java.util.Optional; 34import java.util.concurrent.CompletableFuture; 35import java.util.concurrent.CompletionException; 36import java.util.concurrent.ExecutionException; 37import java.util.concurrent.Executor; 38import java.util.concurrent.Flow; 39import java.util.concurrent.Flow.Subscription; 40import java.util.concurrent.TimeUnit; 41import java.util.concurrent.TimeoutException; 42import java.util.function.Consumer; 43 44import jdk.incubator.http.internal.common.*; 45import jdk.incubator.http.internal.frame.*; 46import jdk.incubator.http.internal.hpack.DecodingCallback; 47 48/** 49 * Http/2 Stream handling. 50 * 51 * REQUESTS 52 * 53 * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q 54 * 55 * sendRequest() -- sendHeadersOnly() + sendBody() 56 * 57 * sendBody() -- in calling thread: obeys all flow control (so may block) 58 * obtains data from request body processor and places on connection 59 * outbound Q. 60 * 61 * sendBodyAsync() -- calls sendBody() in an executor thread. 62 * 63 * sendHeadersAsync() -- calls sendHeadersOnly() which does not block 64 * 65 * sendRequestAsync() -- calls sendRequest() in an executor thread 66 * 67 * RESPONSES 68 * 69 * Multiple responses can be received per request. Responses are queued up on 70 * a LinkedList of CF<HttpResponse> and the the first one on the list is completed 71 * with the next response 72 * 73 * getResponseAsync() -- queries list of response CFs and returns first one 74 * if one exists. Otherwise, creates one and adds it to list 75 * and returns it. Completion is achieved through the 76 * incoming() upcall from connection reader thread. 77 * 78 * getResponse() -- calls getResponseAsync() and waits for CF to complete 79 * 80 * responseBody() -- in calling thread: blocks for incoming DATA frames on 81 * stream inputQ. Obeys remote and local flow control so may block. 82 * Calls user response body processor with data buffers. 83 * 84 * responseBodyAsync() -- calls responseBody() in an executor thread. 85 * 86 * incoming() -- entry point called from connection reader thread. Frames are 87 * either handled immediately without blocking or for data frames 88 * placed on the stream's inputQ which is consumed by the stream's 89 * reader thread. 90 * 91 * PushedStream sub class 92 * ====================== 93 * Sending side methods are not used because the request comes from a PUSH_PROMISE 94 * frame sent by the server. When a PUSH_PROMISE is received the PushedStream 95 * is created. PushedStream does not use responseCF list as there can be only 96 * one response. The CF is created when the object created and when the response 97 * HEADERS frame is received the object is completed. 98 */ 99class Stream<T> extends ExchangeImpl<T> { 100 101 final AsyncDataReadQueue inputQ = new AsyncDataReadQueue(); 102 103 /** 104 * This stream's identifier. Assigned lazily by the HTTP2Connection before 105 * the stream's first frame is sent. 106 */ 107 protected volatile int streamid; 108 109 long responseContentLen = -1; 110 long responseBytesProcessed = 0; 111 long requestContentLen; 112 113 final Http2Connection connection; 114 HttpClientImpl client; 115 final HttpRequestImpl request; 116 final DecodingCallback rspHeadersConsumer; 117 HttpHeadersImpl responseHeaders; 118 final HttpHeadersImpl requestHeaders; 119 final HttpHeadersImpl requestPseudoHeaders; 120 HttpResponse.BodyProcessor<T> responseProcessor; 121 final HttpRequest.BodyProcessor requestProcessor; 122 volatile int responseCode; 123 volatile Response response; 124 volatile CompletableFuture<Response> responseCF; 125 final AbstractPushPublisher<ByteBuffer> publisher; 126 final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>(); 127 128 /** True if END_STREAM has been seen in a frame received on this stream. */ 129 private volatile boolean remotelyClosed; 130 private volatile boolean closed; 131 private volatile boolean endStreamSent; 132 133 // state flags 134 boolean requestSent, responseReceived, responseHeadersReceived; 135 136 /** 137 * A reference to this Stream's connection Send Window controller. The 138 * stream MUST acquire the appropriate amount of Send Window before 139 * sending any data. Will be null for PushStreams, as they cannot send data. 140 */ 141 private final WindowController windowController; 142 private final WindowUpdateSender windowUpdater; 143 144 @Override 145 HttpConnection connection() { 146 return connection.connection; 147 } 148 149 @Override 150 CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler, 151 boolean returnConnectionToPool, 152 Executor executor) 153 { 154 Log.logTrace("Reading body on stream {0}", streamid); 155 responseProcessor = handler.apply(responseCode, responseHeaders); 156 setClientForResponse(responseProcessor); 157 publisher.subscribe(responseProcessor); 158 CompletableFuture<T> cf = receiveData(executor); 159 160 PushGroup<?,?> pg = exchange.getPushGroup(); 161 if (pg != null) { 162 // if an error occurs make sure it is recorded in the PushGroup 163 cf = cf.whenComplete((t,e) -> pg.pushError(e)); 164 } 165 return cf; 166 } 167 168 @Override 169 T readBody(HttpResponse.BodyHandler<T> handler, boolean returnConnectionToPool) 170 throws IOException 171 { 172 CompletableFuture<T> cf = readBodyAsync(handler, 173 returnConnectionToPool, 174 null); 175 try { 176 return cf.join(); 177 } catch (CompletionException e) { 178 throw Utils.getIOException(e); 179 } 180 } 181 182 @Override 183 public String toString() { 184 StringBuilder sb = new StringBuilder(); 185 sb.append("streamid: ") 186 .append(streamid); 187 return sb.toString(); 188 } 189 190 private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException { 191 if (frame instanceof ResetFrame) { 192 handleReset((ResetFrame) frame); 193 return true; 194 } else if (!(frame instanceof DataFrame)) { 195 assert false; 196 return true; 197 } 198 DataFrame df = (DataFrame) frame; 199 // RFC 7540 6.1: 200 // The entire DATA frame payload is included in flow control, 201 // including the Pad Length and Padding fields if present 202 int len = df.payloadLength(); 203 ByteBufferReference[] buffers = df.getData(); 204 for (ByteBufferReference b : buffers) { 205 ByteBuffer buf = b.get(); 206 if (buf.hasRemaining()) { 207 publisher.acceptData(Optional.of(buf)); 208 } 209 } 210 connection.windowUpdater.update(len); 211 if (df.getFlag(DataFrame.END_STREAM)) { 212 setEndStreamReceived(); 213 publisher.acceptData(Optional.empty()); 214 return false; 215 } 216 // Don't send window update on a stream which is 217 // closed or half closed. 218 windowUpdater.update(len); 219 return true; 220 } 221 222 // pushes entire response body into response processor 223 // blocking when required by local or remote flow control 224 CompletableFuture<T> receiveData(Executor executor) { 225 CompletableFuture<T> cf = responseProcessor 226 .getBody() 227 .toCompletableFuture(); 228 Consumer<Throwable> onError = e -> { 229 Log.logTrace("receiveData: {0}", e.toString()); 230 e.printStackTrace(); 231 cf.completeExceptionally(e); 232 publisher.acceptError(e); 233 }; 234 if (executor == null) { 235 inputQ.blockingReceive(this::receiveDataFrame, onError); 236 } else { 237 inputQ.asyncReceive(executor, this::receiveDataFrame, onError); 238 } 239 return cf; 240 } 241 242 @Override 243 void sendBody() throws IOException { 244 try { 245 sendBodyImpl().join(); 246 } catch (CompletionException e) { 247 throw Utils.getIOException(e); 248 } 249 } 250 251 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { 252 return sendBodyImpl().thenApply( v -> this); 253 } 254 255 @SuppressWarnings("unchecked") 256 Stream(HttpClientImpl client, 257 Http2Connection connection, 258 Exchange<T> e, 259 WindowController windowController) 260 { 261 super(e); 262 this.client = client; 263 this.connection = connection; 264 this.windowController = windowController; 265 this.request = e.request(); 266 this.requestProcessor = request.requestProcessor; 267 responseHeaders = new HttpHeadersImpl(); 268 requestHeaders = new HttpHeadersImpl(); 269 rspHeadersConsumer = (name, value) -> { 270 responseHeaders.addHeader(name.toString(), value.toString()); 271 if (Log.headers() && Log.trace()) { 272 Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}", 273 streamid, name, value); 274 } 275 }; 276 this.requestPseudoHeaders = new HttpHeadersImpl(); 277 // NEW 278 this.publisher = new BlockingPushPublisher<>(); 279 this.windowUpdater = new StreamWindowUpdateSender(connection); 280 } 281 282 /** 283 * Entry point from Http2Connection reader thread. 284 * 285 * Data frames will be removed by response body thread. 286 */ 287 void incoming(Http2Frame frame) throws IOException { 288 if ((frame instanceof HeaderFrame)) { 289 HeaderFrame hframe = (HeaderFrame)frame; 290 if (hframe.endHeaders()) { 291 Log.logTrace("handling response (streamid={0})", streamid); 292 handleResponse(); 293 if (hframe.getFlag(HeaderFrame.END_STREAM)) { 294 inputQ.put(new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0])); 295 } 296 } 297 } else if (frame instanceof DataFrame) { 298 inputQ.put(frame); 299 } else { 300 otherFrame(frame); 301 } 302 } 303 304 void otherFrame(Http2Frame frame) throws IOException { 305 switch (frame.type()) { 306 case WindowUpdateFrame.TYPE: 307 incoming_windowUpdate((WindowUpdateFrame) frame); 308 break; 309 case ResetFrame.TYPE: 310 incoming_reset((ResetFrame) frame); 311 break; 312 case PriorityFrame.TYPE: 313 incoming_priority((PriorityFrame) frame); 314 break; 315 default: 316 String msg = "Unexpected frame: " + frame.toString(); 317 throw new IOException(msg); 318 } 319 } 320 321 // The Hpack decoder decodes into one of these consumers of name,value pairs 322 323 DecodingCallback rspHeadersConsumer() { 324 return rspHeadersConsumer; 325 } 326 327 protected void handleResponse() throws IOException { 328 synchronized(this) { 329 responseHeadersReceived = true; 330 } 331 HttpConnection c = connection.connection; // TODO: improve 332 responseCode = (int)responseHeaders 333 .firstValueAsLong(":status") 334 .orElseThrow(() -> new IOException("no statuscode in response")); 335 336 response = new Response( 337 request, exchange, responseHeaders, 338 responseCode, HttpClient.Version.HTTP_2); 339 340 this.responseContentLen = responseHeaders 341 .firstValueAsLong("content-length") 342 .orElse(-1L); 343 344 if (Log.headers()) { 345 StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); 346 Log.dumpHeaders(sb, " ", responseHeaders); 347 Log.logHeaders(sb.toString()); 348 } 349 350 completeResponse(response); 351 } 352 353 void incoming_reset(ResetFrame frame) throws IOException { 354 Log.logTrace("Received RST_STREAM on stream {0}", streamid); 355 if (endStreamReceived()) { 356 Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid); 357 } else if (closed) { 358 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); 359 } else { 360 boolean pushedToQueue = false; 361 synchronized(this) { 362 // if the response headers are not yet 363 // received, or the inputQueue is closed, handle reset directly. 364 // Otherwise, put it in the input queue in order to read all 365 // pending data frames first. Indeed, a server may send 366 // RST_STREAM after sending END_STREAM, in which case we should 367 // ignore it. However, we won't know if we have received END_STREAM 368 // or not until all pending data frames are read. 369 // Because the inputQ will not be read until the response 370 // headers are received, and because response headers won't be 371 // sent if the server sent RST_STREAM, then we must handle 372 // reset here directly unless responseHeadersReceived is true. 373 pushedToQueue = !closed && responseHeadersReceived && inputQ.tryPut(frame); 374 } 375 if (!pushedToQueue) { 376 // RST_STREAM was not pushed to the queue: handle it. 377 try { 378 handleReset(frame); 379 } catch (IOException io) { 380 completeResponseExceptionally(io); 381 } 382 } else { 383 // RST_STREAM was pushed to the queue. It will be handled by 384 // asyncReceive after all pending data frames have been 385 // processed. 386 Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid); 387 } 388 } 389 } 390 391 void handleReset(ResetFrame frame) throws IOException { 392 Log.logTrace("Handling RST_STREAM on stream {0}", streamid); 393 if (!closed) { 394 close(); 395 int error = frame.getErrorCode(); 396 throw new IOException(ErrorFrame.stringForCode(error)); 397 } else { 398 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); 399 } 400 } 401 402 void incoming_priority(PriorityFrame frame) { 403 // TODO: implement priority 404 throw new UnsupportedOperationException("Not implemented"); 405 } 406 407 private void incoming_windowUpdate(WindowUpdateFrame frame) 408 throws IOException 409 { 410 int amount = frame.getUpdate(); 411 if (amount <= 0) { 412 Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n", 413 streamid, streamid, amount); 414 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); 415 } else { 416 assert streamid != 0; 417 boolean success = windowController.increaseStreamWindow(amount, streamid); 418 if (!success) { // overflow 419 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR); 420 } 421 } 422 } 423 424 void incoming_pushPromise(HttpRequestImpl pushReq, 425 PushedStream<?,T> pushStream) 426 throws IOException 427 { 428 if (Log.requests()) { 429 Log.logRequest("PUSH_PROMISE: " + pushReq.toString()); 430 } 431 PushGroup<?,T> pushGroup = exchange.getPushGroup(); 432 if (pushGroup == null || pushGroup.noMorePushes()) { 433 cancelImpl(new IllegalStateException("unexpected push promise" 434 + " on stream " + streamid)); 435 } 436 437 HttpResponse.MultiProcessor<?,T> proc = pushGroup.processor(); 438 439 CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF(); 440 441 Optional<HttpResponse.BodyHandler<T>> bpOpt = proc.onRequest( 442 pushReq); 443 444 if (!bpOpt.isPresent()) { 445 IOException ex = new IOException("Stream " 446 + streamid + " cancelled by user"); 447 if (Log.trace()) { 448 Log.logTrace("No body processor for {0}: {1}", pushReq, 449 ex.getMessage()); 450 } 451 pushStream.cancelImpl(ex); 452 cf.completeExceptionally(ex); 453 return; 454 } 455 456 pushGroup.addPush(); 457 pushStream.requestSent(); 458 pushStream.setPushHandler(bpOpt.get()); 459 // setup housekeeping for when the push is received 460 // TODO: deal with ignoring of CF anti-pattern 461 cf.whenComplete((HttpResponse<T> resp, Throwable t) -> { 462 if (Log.trace()) { 463 Log.logTrace("Push completed on stream {0} for {1}{2}", 464 pushStream.streamid, resp, 465 ((t==null) ? "": " with exception " + t)); 466 } 467 if (t != null) { 468 pushGroup.pushError(t); 469 proc.onError(pushReq, t); 470 } else { 471 proc.onResponse(resp); 472 } 473 pushGroup.pushCompleted(); 474 }); 475 476 } 477 478 private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) { 479 HttpHeadersImpl h = request.getSystemHeaders(); 480 if (contentLength > 0) { 481 h.setHeader("content-length", Long.toString(contentLength)); 482 } 483 setPseudoHeaderFields(); 484 OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(h, request.getUserHeaders(), this); 485 if (contentLength == 0) { 486 f.setFlag(HeadersFrame.END_STREAM); 487 endStreamSent = true; 488 } 489 return f; 490 } 491 492 private void setPseudoHeaderFields() { 493 HttpHeadersImpl hdrs = requestPseudoHeaders; 494 String method = request.method(); 495 hdrs.setHeader(":method", method); 496 URI uri = request.uri(); 497 hdrs.setHeader(":scheme", uri.getScheme()); 498 // TODO: userinfo deprecated. Needs to be removed 499 hdrs.setHeader(":authority", uri.getAuthority()); 500 // TODO: ensure header names beginning with : not in user headers 501 String query = uri.getQuery(); 502 String path = uri.getPath(); 503 if (path == null || path.isEmpty()) { 504 if (method.equalsIgnoreCase("OPTIONS")) { 505 path = "*"; 506 } else { 507 path = "/"; 508 } 509 } 510 if (query != null) { 511 path += "?" + query; 512 } 513 hdrs.setHeader(":path", path); 514 } 515 516 HttpHeadersImpl getRequestPseudoHeaders() { 517 return requestPseudoHeaders; 518 } 519 520 @Override 521 Response getResponse() throws IOException { 522 try { 523 if (request.duration() != null) { 524 Log.logTrace("Waiting for response (streamid={0}, timeout={1}ms)", 525 streamid, 526 request.duration().toMillis()); 527 return getResponseAsync(null).get( 528 request.duration().toMillis(), TimeUnit.MILLISECONDS); 529 } else { 530 Log.logTrace("Waiting for response (streamid={0})", streamid); 531 return getResponseAsync(null).join(); 532 } 533 } catch (TimeoutException e) { 534 Log.logTrace("Response timeout (streamid={0})", streamid); 535 throw new HttpTimeoutException("Response timed out"); 536 } catch (InterruptedException | ExecutionException | CompletionException e) { 537 Throwable t = e.getCause(); 538 Log.logTrace("Response failed (streamid={0}): {1}", streamid, t); 539 if (t instanceof IOException) { 540 throw (IOException)t; 541 } 542 throw new IOException(e); 543 } finally { 544 Log.logTrace("Got response or failed (streamid={0})", streamid); 545 } 546 } 547 548 /** Sets endStreamReceived. Should be called only once. */ 549 void setEndStreamReceived() { 550 assert remotelyClosed == false: "Unexpected endStream already set"; 551 remotelyClosed = true; 552 responseReceived(); 553 } 554 555 /** Tells whether, or not, the END_STREAM Flag has been seen in any frame 556 * received on this stream. */ 557 private boolean endStreamReceived() { 558 return remotelyClosed; 559 } 560 561 @Override 562 void sendHeadersOnly() throws IOException, InterruptedException { 563 if (Log.requests() && request != null) { 564 Log.logRequest(request.toString()); 565 } 566 requestContentLen = requestProcessor.contentLength(); 567 OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen); 568 connection.sendFrame(f); 569 } 570 571 void registerStream(int id) { 572 this.streamid = id; 573 connection.putStream(this, streamid); 574 } 575 576 class RequestSubscriber 577 extends RequestProcessors.ProcessorBase 578 implements Flow.Subscriber<ByteBuffer> 579 { 580 // can be < 0 if the actual length is not known. 581 private volatile long remainingContentLength; 582 private volatile Subscription subscription; 583 584 RequestSubscriber(long contentLen) { 585 this.remainingContentLength = contentLen; 586 } 587 588 @Override 589 public void onSubscribe(Flow.Subscription subscription) { 590 if (this.subscription != null) { 591 throw new IllegalStateException(); 592 } 593 this.subscription = subscription; 594 subscription.request(1); 595 } 596 597 @Override 598 public void onNext(ByteBuffer item) { 599 if (requestBodyCF.isDone()) { 600 throw new IllegalStateException(); 601 } 602 603 try { 604 while (item.hasRemaining()) { 605 assert !endStreamSent : "internal error, send data after END_STREAM flag"; 606 DataFrame df = getDataFrame(item); 607 if (remainingContentLength > 0) { 608 remainingContentLength -= df.getDataLength(); 609 assert remainingContentLength >= 0; 610 if (remainingContentLength == 0) { 611 df.setFlag(DataFrame.END_STREAM); 612 endStreamSent = true; 613 } 614 } 615 connection.sendDataFrame(df); 616 } 617 subscription.request(1); 618 } catch (InterruptedException ex) { 619 subscription.cancel(); 620 requestBodyCF.completeExceptionally(ex); 621 } 622 } 623 624 @Override 625 public void onError(Throwable throwable) { 626 if (requestBodyCF.isDone()) { 627 return; 628 } 629 subscription.cancel(); 630 requestBodyCF.completeExceptionally(throwable); 631 } 632 633 @Override 634 public void onComplete() { 635 assert endStreamSent || remainingContentLength < 0; 636 try { 637 if (!endStreamSent) { 638 endStreamSent = true; 639 connection.sendDataFrame(getEmptyEndStreamDataFrame()); 640 } 641 requestBodyCF.complete(null); 642 } catch (InterruptedException ex) { 643 requestBodyCF.completeExceptionally(ex); 644 } 645 } 646 } 647 648 DataFrame getDataFrame(ByteBuffer buffer) throws InterruptedException { 649 int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining()); 650 // blocks waiting for stream send window, if exhausted 651 int actualAmount = windowController.tryAcquire(requestAmount, streamid); 652 ByteBuffer outBuf = Utils.slice(buffer, actualAmount); 653 DataFrame df = new DataFrame(streamid, 0 , ByteBufferReference.of(outBuf)); 654 return df; 655 } 656 657 private DataFrame getEmptyEndStreamDataFrame() throws InterruptedException { 658 return new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]); 659 } 660 661 /** 662 * A List of responses relating to this stream. Normally there is only 663 * one response, but intermediate responses like 100 are allowed 664 * and must be passed up to higher level before continuing. Deals with races 665 * such as if responses are returned before the CFs get created by 666 * getResponseAsync() 667 */ 668 669 final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5); 670 671 @Override 672 CompletableFuture<Response> getResponseAsync(Executor executor) { 673 CompletableFuture<Response> cf = null; 674 // The code below deals with race condition that can be caused when 675 // completeResponse() is being called before getResponseAsync() 676 synchronized (response_cfs) { 677 if (!response_cfs.isEmpty()) { 678 // This CompletableFuture was created by completeResponse(). 679 // it will be already completed. 680 cf = response_cfs.remove(0); 681 // if we find a cf here it should be already completed. 682 // finding a non completed cf should not happen. just assert it. 683 assert cf.isDone() : "Removing uncompleted response: could cause code to hang!"; 684 } else { 685 // getResponseAsync() is called first. Create a CompletableFuture 686 // that will be completed by completeResponse() when 687 // completeResponse() is called. 688 cf = new MinimalFuture<>(); 689 response_cfs.add(cf); 690 } 691 } 692 if (executor != null && !cf.isDone()) { 693 // protect from executing later chain of CompletableFuture operations from SelectorManager thread 694 cf = cf.thenApplyAsync(r -> r, executor); 695 } 696 Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf); 697 PushGroup<?,?> pg = exchange.getPushGroup(); 698 if (pg != null) { 699 // if an error occurs make sure it is recorded in the PushGroup 700 cf = cf.whenComplete((t,e) -> pg.pushError(e)); 701 } 702 return cf; 703 } 704 705 /** 706 * Completes the first uncompleted CF on list, and removes it. If there is no 707 * uncompleted CF then creates one (completes it) and adds to list 708 */ 709 void completeResponse(Response resp) { 710 synchronized (response_cfs) { 711 CompletableFuture<Response> cf; 712 int cfs_len = response_cfs.size(); 713 for (int i=0; i<cfs_len; i++) { 714 cf = response_cfs.get(i); 715 if (!cf.isDone()) { 716 Log.logTrace("Completing response (streamid={0}): {1}", 717 streamid, cf); 718 cf.complete(resp); 719 response_cfs.remove(cf); 720 return; 721 } // else we found the previous response: just leave it alone. 722 } 723 cf = MinimalFuture.completedFuture(resp); 724 Log.logTrace("Created completed future (streamid={0}): {1}", 725 streamid, cf); 726 response_cfs.add(cf); 727 } 728 } 729 730 // methods to update state and remove stream when finished 731 732 synchronized void requestSent() { 733 requestSent = true; 734 if (responseReceived) { 735 close(); 736 } 737 } 738 739 final synchronized boolean isResponseReceived() { 740 return responseReceived; 741 } 742 743 synchronized void responseReceived() { 744 responseReceived = true; 745 if (requestSent) { 746 close(); 747 } 748 } 749 750 /** 751 * same as above but for errors 752 */ 753 void completeResponseExceptionally(Throwable t) { 754 synchronized (response_cfs) { 755 // use index to avoid ConcurrentModificationException 756 // caused by removing the CF from within the loop. 757 for (int i = 0; i < response_cfs.size(); i++) { 758 CompletableFuture<Response> cf = response_cfs.get(i); 759 if (!cf.isDone()) { 760 cf.completeExceptionally(t); 761 response_cfs.remove(i); 762 return; 763 } 764 } 765 response_cfs.add(MinimalFuture.failedFuture(t)); 766 } 767 } 768 769 CompletableFuture<Void> sendBodyImpl() { 770 RequestSubscriber subscriber = new RequestSubscriber(requestContentLen); 771 subscriber.setClient(client); 772 requestProcessor.subscribe(subscriber); 773 requestBodyCF.whenComplete((v,t) -> requestSent()); 774 return requestBodyCF; 775 } 776 777 @Override 778 void cancel() { 779 cancel(new IOException("Stream " + streamid + " cancelled")); 780 } 781 782 @Override 783 void cancel(IOException cause) { 784 cancelImpl(cause); 785 } 786 787 // This method sends a RST_STREAM frame 788 void cancelImpl(Throwable e) { 789 if (Log.trace()) { 790 Log.logTrace("cancelling stream {0}: {1}\n", streamid, e); 791 } 792 boolean closing; 793 if (closing = !closed) { // assigning closing to !closed 794 synchronized (this) { 795 if (closing = !closed) { // assigning closing to !closed 796 closed=true; 797 } 798 } 799 } 800 if (closing) { // true if the stream has not been closed yet 801 inputQ.close(); 802 } 803 completeResponseExceptionally(e); 804 try { 805 // will send a RST_STREAM frame 806 if (streamid != 0) { 807 connection.resetStream(streamid, ResetFrame.CANCEL); 808 } 809 } catch (IOException ex) { 810 Log.logError(ex); 811 } 812 } 813 814 // This method doesn't send any frame 815 void close() { 816 if (closed) return; 817 synchronized(this) { 818 if (closed) return; 819 closed = true; 820 } 821 Log.logTrace("Closing stream {0}", streamid); 822 inputQ.close(); 823 connection.closeStream(streamid); 824 Log.logTrace("Stream {0} closed", streamid); 825 } 826 827 static class PushedStream<U,T> extends Stream<T> { 828 final PushGroup<U,T> pushGroup; 829 private final Stream<T> parent; // used by server push streams 830 // push streams need the response CF allocated up front as it is 831 // given directly to user via the multi handler callback function. 832 final CompletableFuture<Response> pushCF; 833 final CompletableFuture<HttpResponse<T>> responseCF; 834 final HttpRequestImpl pushReq; 835 HttpResponse.BodyHandler<T> pushHandler; 836 837 PushedStream(PushGroup<U,T> pushGroup, HttpClientImpl client, 838 Http2Connection connection, Stream<T> parent, 839 Exchange<T> pushReq) { 840 // ## no request body possible, null window controller 841 super(client, connection, pushReq, null); 842 this.pushGroup = pushGroup; 843 this.pushReq = pushReq.request(); 844 this.pushCF = new MinimalFuture<>(); 845 this.responseCF = new MinimalFuture<>(); 846 this.parent = parent; 847 } 848 849 CompletableFuture<HttpResponse<T>> responseCF() { 850 return responseCF; 851 } 852 853 synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) { 854 this.pushHandler = pushHandler; 855 } 856 857 synchronized HttpResponse.BodyHandler<T> getPushHandler() { 858 // ignored parameters to function can be used as BodyHandler 859 return this.pushHandler; 860 } 861 862 // Following methods call the super class but in case of 863 // error record it in the PushGroup. The error method is called 864 // with a null value when no error occurred (is a no-op) 865 @Override 866 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { 867 return super.sendBodyAsync() 868 .whenComplete((ExchangeImpl<T> v, Throwable t) -> pushGroup.pushError(t)); 869 } 870 871 @Override 872 CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { 873 return super.sendHeadersAsync() 874 .whenComplete((ExchangeImpl<T> ex, Throwable t) -> pushGroup.pushError(t)); 875 } 876 877 @Override 878 CompletableFuture<Response> getResponseAsync(Executor executor) { 879 CompletableFuture<Response> cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t)); 880 if(executor!=null && !cf.isDone()) { 881 cf = cf.thenApplyAsync( r -> r, executor); 882 } 883 return cf; 884 } 885 886 @Override 887 CompletableFuture<T> readBodyAsync( 888 HttpResponse.BodyHandler<T> handler, 889 boolean returnConnectionToPool, 890 Executor executor) 891 { 892 return super.readBodyAsync(handler, returnConnectionToPool, executor) 893 .whenComplete((v, t) -> pushGroup.pushError(t)); 894 } 895 896 @Override 897 void completeResponse(Response r) { 898 HttpResponseImpl.logResponse(r); 899 pushCF.complete(r); // not strictly required for push API 900 // start reading the body using the obtained BodyProcessor 901 CompletableFuture<Void> start = new MinimalFuture<>(); 902 start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor())) 903 .whenComplete((T body, Throwable t) -> { 904 if (t != null) { 905 responseCF.completeExceptionally(t); 906 } else { 907 HttpResponseImpl<T> response = new HttpResponseImpl<>(r.request, r, body, getExchange()); 908 responseCF.complete(response); 909 } 910 }); 911 start.completeAsync(() -> null, getExchange().executor()); 912 } 913 914 @Override 915 void completeResponseExceptionally(Throwable t) { 916 pushCF.completeExceptionally(t); 917 } 918 919 @Override 920 synchronized void responseReceived() { 921 super.responseReceived(); 922 } 923 924 // create and return the PushResponseImpl 925 @Override 926 protected void handleResponse() { 927 HttpConnection c = connection.connection; // TODO: improve 928 responseCode = (int)responseHeaders 929 .firstValueAsLong(":status") 930 .orElse(-1); 931 932 if (responseCode == -1) { 933 completeResponseExceptionally(new IOException("No status code")); 934 } 935 936 this.response = new Response( 937 pushReq, exchange, responseHeaders, 938 responseCode, HttpClient.Version.HTTP_2); 939 940 this.responseContentLen = responseHeaders 941 .firstValueAsLong("content-length") 942 .orElse(-1L); 943 944 if (Log.headers()) { 945 StringBuilder sb = new StringBuilder("RESPONSE HEADERS"); 946 sb.append(" (streamid=").append(streamid).append("): "); 947 Log.dumpHeaders(sb, " ", responseHeaders); 948 Log.logHeaders(sb.toString()); 949 } 950 951 // different implementations for normal streams and pushed streams 952 completeResponse(response); 953 } 954 } 955 956 final class StreamWindowUpdateSender extends WindowUpdateSender { 957 958 StreamWindowUpdateSender(Http2Connection connection) { 959 super(connection); 960 } 961 962 @Override 963 int getStreamId() { 964 return streamid; 965 } 966 } 967 968} 969