1/* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26package jdk.incubator.http; 27 28import java.io.IOException; 29import java.io.UncheckedIOException; 30import java.net.InetSocketAddress; 31import java.net.ProxySelector; 32import java.net.SocketPermission; 33import java.net.URI; 34import java.net.URISyntaxException; 35import java.net.URLPermission; 36import java.security.AccessControlContext; 37import java.security.AccessController; 38import java.security.PrivilegedAction; 39import java.security.PrivilegedActionException; 40import java.security.PrivilegedExceptionAction; 41import java.util.LinkedList; 42import java.util.List; 43import java.util.concurrent.CompletableFuture; 44import java.util.concurrent.Executor; 45import jdk.incubator.http.internal.common.MinimalFuture; 46import jdk.incubator.http.internal.common.Utils; 47import jdk.incubator.http.internal.common.Log; 48 49/** 50 * One request/response exchange (handles 100/101 intermediate response also). 51 * depth field used to track number of times a new request is being sent 52 * for a given API request. If limit exceeded exception is thrown. 53 * 54 * Security check is performed here: 55 * - uses AccessControlContext captured at API level 56 * - checks for appropriate URLPermission for request 57 * - if permission allowed, grants equivalent SocketPermission to call 58 * - in case of direct HTTP proxy, checks additionally for access to proxy 59 * (CONNECT proxying uses its own Exchange, so check done there) 60 * 61 */ 62final class Exchange<T> { 63 64 final HttpRequestImpl request; 65 final HttpClientImpl client; 66 volatile ExchangeImpl<T> exchImpl; 67 // used to record possible cancellation raised before the exchImpl 68 // has been established. 69 private volatile IOException failed; 70 final List<SocketPermission> permissions = new LinkedList<>(); 71 final AccessControlContext acc; 72 final MultiExchange<?,T> multi; 73 final Executor parentExecutor; 74 final HttpRequest.BodyProcessor requestProcessor; 75 boolean upgrading; // to HTTP/2 76 final PushGroup<?,T> pushGroup; 77 78 Exchange(HttpRequestImpl request, MultiExchange<?,T> multi) { 79 this.request = request; 80 this.upgrading = false; 81 this.client = multi.client(); 82 this.multi = multi; 83 this.acc = multi.acc; 84 this.parentExecutor = multi.executor; 85 this.requestProcessor = request.requestProcessor; 86 this.pushGroup = multi.pushGroup; 87 } 88 89 /* If different AccessControlContext to be used */ 90 Exchange(HttpRequestImpl request, 91 MultiExchange<?,T> multi, 92 AccessControlContext acc) 93 { 94 this.request = request; 95 this.acc = acc; 96 this.upgrading = false; 97 this.client = multi.client(); 98 this.multi = multi; 99 this.parentExecutor = multi.executor; 100 this.requestProcessor = request.requestProcessor; 101 this.pushGroup = multi.pushGroup; 102 } 103 104 PushGroup<?,T> getPushGroup() { 105 return pushGroup; 106 } 107 108 Executor executor() { 109 return parentExecutor; 110 } 111 112 public HttpRequestImpl request() { 113 return request; 114 } 115 116 HttpClientImpl client() { 117 return client; 118 } 119 120 public Response response() throws IOException, InterruptedException { 121 return responseImpl(null); 122 } 123 124 public T readBody(HttpResponse.BodyHandler<T> responseHandler) throws IOException { 125 // The connection will not be returned to the pool in the case of WebSocket 126 return exchImpl.readBody(responseHandler, !request.isWebSocket()); 127 } 128 129 public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) { 130 // The connection will not be returned to the pool in the case of WebSocket 131 return exchImpl.readBodyAsync(handler, !request.isWebSocket(), parentExecutor); 132 } 133 134 public void cancel() { 135 // cancel can be called concurrently before or at the same time 136 // that the exchange impl is being established. 137 // In that case we won't be able to propagate the cancellation 138 // right away 139 if (exchImpl != null) { 140 exchImpl.cancel(); 141 } else { 142 // no impl - can't cancel impl yet. 143 // call cancel(IOException) instead which takes care 144 // of race conditions between impl/cancel. 145 cancel(new IOException("Request cancelled")); 146 } 147 } 148 149 public void cancel(IOException cause) { 150 // If the impl is non null, propagate the exception right away. 151 // Otherwise record it so that it can be propagated once the 152 // exchange impl has been established. 153 ExchangeImpl<?> impl = exchImpl; 154 if (impl != null) { 155 // propagate the exception to the impl 156 impl.cancel(cause); 157 } else { 158 try { 159 // no impl yet. record the exception 160 failed = cause; 161 // now call checkCancelled to recheck the impl. 162 // if the failed state is set and the impl is not null, reset 163 // the failed state and propagate the exception to the impl. 164 checkCancelled(false); 165 } catch (IOException x) { 166 // should not happen - we passed 'false' above 167 throw new UncheckedIOException(x); 168 } 169 } 170 } 171 172 // This method will raise an exception if one was reported and if 173 // it is possible to do so. If the exception can be raised, then 174 // the failed state will be reset. Otherwise, the failed state 175 // will persist until the exception can be raised and the failed state 176 // can be cleared. 177 // Takes care of possible race conditions. 178 private void checkCancelled(boolean throwIfNoImpl) throws IOException { 179 ExchangeImpl<?> impl = null; 180 IOException cause = null; 181 if (failed != null) { 182 synchronized(this) { 183 cause = failed; 184 impl = exchImpl; 185 if (throwIfNoImpl || impl != null) { 186 // The exception will be raised by one of the two methods 187 // below: reset the failed state. 188 failed = null; 189 } 190 } 191 } 192 if (cause == null) return; 193 if (impl != null) { 194 // The exception is raised by propagating it to the impl. 195 impl.cancel(cause); 196 } else if (throwIfNoImpl) { 197 // The exception is raised by throwing it immediately 198 throw cause; 199 } else { 200 Log.logTrace("Exchange: request [{0}/timeout={1}ms] no impl is set." 201 + "\n\tCan''t cancel yet with {2}", 202 request.uri(), 203 request.duration() == null ? -1 : 204 // calling duration.toMillis() can throw an exception. 205 // this is just debugging, we don't care if it overflows. 206 (request.duration().getSeconds() * 1000 207 + request.duration().getNano() / 1000000), 208 cause); 209 } 210 } 211 212 public void h2Upgrade() { 213 upgrading = true; 214 request.setH2Upgrade(client.client2()); 215 } 216 217 static final SocketPermission[] SOCKET_ARRAY = new SocketPermission[0]; 218 219 Response responseImpl(HttpConnection connection) 220 throws IOException, InterruptedException 221 { 222 SecurityException e = securityCheck(acc); 223 if (e != null) { 224 throw e; 225 } 226 227 if (permissions.size() > 0) { 228 try { 229 return AccessController.doPrivileged( 230 (PrivilegedExceptionAction<Response>)() -> 231 responseImpl0(connection), 232 null, 233 permissions.toArray(SOCKET_ARRAY)); 234 } catch (Throwable ee) { 235 if (ee instanceof PrivilegedActionException) { 236 ee = ee.getCause(); 237 } 238 if (ee instanceof IOException) { 239 throw (IOException) ee; 240 } else { 241 throw new RuntimeException(ee); // TODO: fix 242 } 243 } 244 } else { 245 return responseImpl0(connection); 246 } 247 } 248 249 // get/set the exchange impl, solving race condition issues with 250 // potential concurrent calls to cancel() or cancel(IOException) 251 private void establishExchange(HttpConnection connection) 252 throws IOException, InterruptedException 253 { 254 // check if we have been cancelled first. 255 checkCancelled(true); 256 // not yet cancelled: create/get a new impl 257 exchImpl = ExchangeImpl.get(this, connection); 258 // recheck for cancelled, in case of race conditions 259 checkCancelled(true); 260 // now we're good to go. because exchImpl is no longer null 261 // cancel() will be able to propagate directly to the impl 262 // after this point. 263 } 264 265 private Response responseImpl0(HttpConnection connection) 266 throws IOException, InterruptedException 267 { 268 establishExchange(connection); 269 exchImpl.setClientForRequest(requestProcessor); 270 if (request.expectContinue()) { 271 Log.logTrace("Sending Expect: 100-Continue"); 272 request.addSystemHeader("Expect", "100-Continue"); 273 exchImpl.sendHeadersOnly(); 274 275 Log.logTrace("Waiting for 407-Expectation-Failed or 100-Continue"); 276 Response resp = exchImpl.getResponse(); 277 HttpResponseImpl.logResponse(resp); 278 int rcode = resp.statusCode(); 279 if (rcode != 100) { 280 Log.logTrace("Expectation failed: Received {0}", 281 rcode); 282 if (upgrading && rcode == 101) { 283 throw new IOException( 284 "Unable to handle 101 while waiting for 100-Continue"); 285 } 286 return resp; 287 } 288 289 Log.logTrace("Received 100-Continue: sending body"); 290 exchImpl.sendBody(); 291 292 Log.logTrace("Body sent: waiting for response"); 293 resp = exchImpl.getResponse(); 294 HttpResponseImpl.logResponse(resp); 295 296 return checkForUpgrade(resp, exchImpl); 297 } else { 298 exchImpl.sendHeadersOnly(); 299 exchImpl.sendBody(); 300 Response resp = exchImpl.getResponse(); 301 HttpResponseImpl.logResponse(resp); 302 return checkForUpgrade(resp, exchImpl); 303 } 304 } 305 306 // Completed HttpResponse will be null if response succeeded 307 // will be a non null responseAsync if expect continue returns an error 308 309 public CompletableFuture<Response> responseAsync() { 310 return responseAsyncImpl(null); 311 } 312 313 CompletableFuture<Response> responseAsyncImpl(HttpConnection connection) { 314 SecurityException e = securityCheck(acc); 315 if (e != null) { 316 return MinimalFuture.failedFuture(e); 317 } 318 if (permissions.size() > 0) { 319 return AccessController.doPrivileged( 320 (PrivilegedAction<CompletableFuture<Response>>)() -> 321 responseAsyncImpl0(connection), 322 null, 323 permissions.toArray(SOCKET_ARRAY)); 324 } else { 325 return responseAsyncImpl0(connection); 326 } 327 } 328 329 CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) { 330 try { 331 establishExchange(connection); 332 } catch (IOException | InterruptedException e) { 333 return MinimalFuture.failedFuture(e); 334 } 335 if (request.expectContinue()) { 336 request.addSystemHeader("Expect", "100-Continue"); 337 Log.logTrace("Sending Expect: 100-Continue"); 338 return exchImpl 339 .sendHeadersAsync() 340 .thenCompose(v -> exchImpl.getResponseAsync(parentExecutor)) 341 .thenCompose((Response r1) -> { 342 HttpResponseImpl.logResponse(r1); 343 int rcode = r1.statusCode(); 344 if (rcode == 100) { 345 Log.logTrace("Received 100-Continue: sending body"); 346 CompletableFuture<Response> cf = 347 exchImpl.sendBodyAsync() 348 .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor)); 349 cf = wrapForUpgrade(cf); 350 cf = wrapForLog(cf); 351 return cf; 352 } else { 353 Log.logTrace("Expectation failed: Received {0}", 354 rcode); 355 if (upgrading && rcode == 101) { 356 IOException failed = new IOException( 357 "Unable to handle 101 while waiting for 100"); 358 return MinimalFuture.failedFuture(failed); 359 } 360 return exchImpl.readBodyAsync(this::ignoreBody, false, parentExecutor) 361 .thenApply(v -> r1); 362 } 363 }); 364 } else { 365 CompletableFuture<Response> cf = exchImpl 366 .sendHeadersAsync() 367 .thenCompose(ExchangeImpl::sendBodyAsync) 368 .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor)); 369 cf = wrapForUpgrade(cf); 370 cf = wrapForLog(cf); 371 return cf; 372 } 373 } 374 375 private CompletableFuture<Response> wrapForUpgrade(CompletableFuture<Response> cf) { 376 if (upgrading) { 377 return cf.thenCompose(r -> checkForUpgradeAsync(r, exchImpl)); 378 } 379 return cf; 380 } 381 382 private CompletableFuture<Response> wrapForLog(CompletableFuture<Response> cf) { 383 if (Log.requests()) { 384 return cf.thenApply(response -> { 385 HttpResponseImpl.logResponse(response); 386 return response; 387 }); 388 } 389 return cf; 390 } 391 392 HttpResponse.BodyProcessor<T> ignoreBody(int status, HttpHeaders hdrs) { 393 return HttpResponse.BodyProcessor.discard((T)null); 394 } 395 396 // if this response was received in reply to an upgrade 397 // then create the Http2Connection from the HttpConnection 398 // initialize it and wait for the real response on a newly created Stream 399 400 private CompletableFuture<Response> 401 checkForUpgradeAsync(Response resp, 402 ExchangeImpl<T> ex) { 403 404 int rcode = resp.statusCode(); 405 if (upgrading && (rcode == 101)) { 406 Http1Exchange<T> e = (Http1Exchange<T>)ex; 407 // check for 101 switching protocols 408 // 101 responses are not supposed to contain a body. 409 // => should we fail if there is one? 410 return e.readBodyAsync(this::ignoreBody, false, parentExecutor) 411 .thenCompose((T v) -> // v is null 412 Http2Connection.createAsync(e.connection(), 413 client.client2(), 414 this, e.getBuffer()) 415 .thenCompose((Http2Connection c) -> { 416 c.putConnection(); 417 Stream<T> s = c.getStream(1); 418 exchImpl = s; 419 return s.getResponseAsync(null); 420 }) 421 ); 422 } 423 return MinimalFuture.completedFuture(resp); 424 } 425 426 private Response checkForUpgrade(Response resp, 427 ExchangeImpl<T> ex) 428 throws IOException, InterruptedException 429 { 430 int rcode = resp.statusCode(); 431 if (upgrading && (rcode == 101)) { 432 Http1Exchange<T> e = (Http1Exchange<T>) ex; 433 434 // 101 responses are not supposed to contain a body. 435 // => should we fail if there is one? 436 // => readBody called here by analogy with 437 // checkForUpgradeAsync above 438 e.readBody(this::ignoreBody, false); 439 440 // must get connection from Http1Exchange 441 Http2Connection h2con = new Http2Connection(e.connection(), 442 client.client2(), 443 this, e.getBuffer()); 444 h2con.putConnection(); 445 Stream<T> s = h2con.getStream(1); 446 exchImpl = s; 447 Response xx = s.getResponse(); 448 HttpResponseImpl.logResponse(xx); 449 return xx; 450 } 451 return resp; 452 } 453 454 private URI getURIForSecurityCheck() { 455 URI u; 456 String method = request.method(); 457 InetSocketAddress authority = request.authority(); 458 URI uri = request.uri(); 459 460 // CONNECT should be restricted at API level 461 if (method.equalsIgnoreCase("CONNECT")) { 462 try { 463 u = new URI("socket", 464 null, 465 authority.getHostString(), 466 authority.getPort(), 467 null, 468 null, 469 null); 470 } catch (URISyntaxException e) { 471 throw new InternalError(e); // shouldn't happen 472 } 473 } else { 474 u = uri; 475 } 476 return u; 477 } 478 479 /** 480 * Do the security check and return any exception. 481 * Return null if no check needed or passes. 482 * 483 * Also adds any generated permissions to the "permissions" list. 484 */ 485 private SecurityException securityCheck(AccessControlContext acc) { 486 SecurityManager sm = System.getSecurityManager(); 487 if (sm == null) { 488 return null; 489 } 490 491 String method = request.method(); 492 HttpHeaders userHeaders = request.getUserHeaders(); 493 URI u = getURIForSecurityCheck(); 494 URLPermission p = Utils.getPermission(u, method, userHeaders.map()); 495 496 try { 497 assert acc != null; 498 sm.checkPermission(p, acc); 499 permissions.add(getSocketPermissionFor(u)); 500 } catch (SecurityException e) { 501 return e; 502 } 503 ProxySelector ps = client.proxy().orElse(null); 504 if (ps != null) { 505 InetSocketAddress proxy = (InetSocketAddress) 506 ps.select(u).get(0).address(); // TODO: check this 507 // may need additional check 508 if (!method.equals("CONNECT")) { 509 // a direct http proxy. Need to check access to proxy 510 try { 511 u = new URI("socket", null, proxy.getHostString(), 512 proxy.getPort(), null, null, null); 513 } catch (URISyntaxException e) { 514 throw new InternalError(e); // shouldn't happen 515 } 516 p = new URLPermission(u.toString(), "CONNECT"); 517 try { 518 sm.checkPermission(p, acc); 519 } catch (SecurityException e) { 520 permissions.clear(); 521 return e; 522 } 523 String sockperm = proxy.getHostString() + 524 ":" + Integer.toString(proxy.getPort()); 525 526 permissions.add(new SocketPermission(sockperm, "connect,resolve")); 527 } 528 } 529 return null; 530 } 531 532 HttpClient.Redirect followRedirects() { 533 return client.followRedirects(); 534 } 535 536 HttpClient.Version version() { 537 return multi.version(); 538 } 539 540 private static SocketPermission getSocketPermissionFor(URI url) { 541 if (System.getSecurityManager() == null) { 542 return null; 543 } 544 545 StringBuilder sb = new StringBuilder(); 546 String host = url.getHost(); 547 sb.append(host); 548 int port = url.getPort(); 549 if (port == -1) { 550 String scheme = url.getScheme(); 551 if ("http".equals(scheme)) { 552 sb.append(":80"); 553 } else { // scheme must be https 554 sb.append(":443"); 555 } 556 } else { 557 sb.append(':') 558 .append(Integer.toString(port)); 559 } 560 String target = sb.toString(); 561 return new SocketPermission(target, "connect"); 562 } 563 564 AccessControlContext getAccessControlContext() { 565 return acc; 566 } 567} 568