1/*
2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package jdk.incubator.http;
27
28import java.io.IOException;
29import java.io.UncheckedIOException;
30import java.net.InetSocketAddress;
31import java.net.ProxySelector;
32import java.net.SocketPermission;
33import java.net.URI;
34import java.net.URISyntaxException;
35import java.net.URLPermission;
36import java.security.AccessControlContext;
37import java.security.AccessController;
38import java.security.PrivilegedAction;
39import java.security.PrivilegedActionException;
40import java.security.PrivilegedExceptionAction;
41import java.util.LinkedList;
42import java.util.List;
43import java.util.concurrent.CompletableFuture;
44import java.util.concurrent.Executor;
45import jdk.incubator.http.internal.common.MinimalFuture;
46import jdk.incubator.http.internal.common.Utils;
47import jdk.incubator.http.internal.common.Log;
48
49/**
50 * One request/response exchange (handles 100/101 intermediate response also).
51 * depth field used to track number of times a new request is being sent
52 * for a given API request. If limit exceeded exception is thrown.
53 *
54 * Security check is performed here:
55 * - uses AccessControlContext captured at API level
56 * - checks for appropriate URLPermission for request
57 * - if permission allowed, grants equivalent SocketPermission to call
58 * - in case of direct HTTP proxy, checks additionally for access to proxy
59 *    (CONNECT proxying uses its own Exchange, so check done there)
60 *
61 */
62final class Exchange<T> {
63
64    final HttpRequestImpl request;
65    final HttpClientImpl client;
66    volatile ExchangeImpl<T> exchImpl;
67    // used to record possible cancellation raised before the exchImpl
68    // has been established.
69    private volatile IOException failed;
70    final List<SocketPermission> permissions = new LinkedList<>();
71    final AccessControlContext acc;
72    final MultiExchange<?,T> multi;
73    final Executor parentExecutor;
74    final HttpRequest.BodyProcessor requestProcessor;
75    boolean upgrading; // to HTTP/2
76    final PushGroup<?,T> pushGroup;
77
78    Exchange(HttpRequestImpl request, MultiExchange<?,T> multi) {
79        this.request = request;
80        this.upgrading = false;
81        this.client = multi.client();
82        this.multi = multi;
83        this.acc = multi.acc;
84        this.parentExecutor = multi.executor;
85        this.requestProcessor = request.requestProcessor;
86        this.pushGroup = multi.pushGroup;
87    }
88
89    /* If different AccessControlContext to be used  */
90    Exchange(HttpRequestImpl request,
91             MultiExchange<?,T> multi,
92             AccessControlContext acc)
93    {
94        this.request = request;
95        this.acc = acc;
96        this.upgrading = false;
97        this.client = multi.client();
98        this.multi = multi;
99        this.parentExecutor = multi.executor;
100        this.requestProcessor = request.requestProcessor;
101        this.pushGroup = multi.pushGroup;
102    }
103
104    PushGroup<?,T> getPushGroup() {
105        return pushGroup;
106    }
107
108    Executor executor() {
109        return parentExecutor;
110    }
111
112    public HttpRequestImpl request() {
113        return request;
114    }
115
116    HttpClientImpl client() {
117        return client;
118    }
119
120    public Response response() throws IOException, InterruptedException {
121        return responseImpl(null);
122    }
123
124    public T readBody(HttpResponse.BodyHandler<T> responseHandler) throws IOException {
125        // The connection will not be returned to the pool in the case of WebSocket
126        return exchImpl.readBody(responseHandler, !request.isWebSocket());
127    }
128
129    public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) {
130        // The connection will not be returned to the pool in the case of WebSocket
131        return exchImpl.readBodyAsync(handler, !request.isWebSocket(), parentExecutor);
132    }
133
134    public void cancel() {
135        // cancel can be called concurrently before or at the same time
136        // that the exchange impl is being established.
137        // In that case we won't be able to propagate the cancellation
138        // right away
139        if (exchImpl != null) {
140            exchImpl.cancel();
141        } else {
142            // no impl - can't cancel impl yet.
143            // call cancel(IOException) instead which takes care
144            // of race conditions between impl/cancel.
145            cancel(new IOException("Request cancelled"));
146        }
147    }
148
149    public void cancel(IOException cause) {
150        // If the impl is non null, propagate the exception right away.
151        // Otherwise record it so that it can be propagated once the
152        // exchange impl has been established.
153        ExchangeImpl<?> impl = exchImpl;
154        if (impl != null) {
155            // propagate the exception to the impl
156            impl.cancel(cause);
157        } else {
158            try {
159                // no impl yet. record the exception
160                failed = cause;
161                // now call checkCancelled to recheck the impl.
162                // if the failed state is set and the impl is not null, reset
163                // the failed state and propagate the exception to the impl.
164                checkCancelled(false);
165            } catch (IOException x) {
166                // should not happen - we passed 'false' above
167                throw new UncheckedIOException(x);
168            }
169        }
170    }
171
172    // This method will raise an exception if one was reported and if
173    // it is possible to do so. If the exception can be raised, then
174    // the failed state will be reset. Otherwise, the failed state
175    // will persist until the exception can be raised and the failed state
176    // can be cleared.
177    // Takes care of possible race conditions.
178    private void checkCancelled(boolean throwIfNoImpl) throws IOException {
179        ExchangeImpl<?> impl = null;
180        IOException cause = null;
181        if (failed != null) {
182            synchronized(this) {
183                cause = failed;
184                impl = exchImpl;
185                if (throwIfNoImpl || impl != null) {
186                    // The exception will be raised by one of the two methods
187                    // below: reset the failed state.
188                    failed = null;
189                }
190            }
191        }
192        if (cause == null) return;
193        if (impl != null) {
194            // The exception is raised by propagating it to the impl.
195            impl.cancel(cause);
196        } else if (throwIfNoImpl) {
197            // The exception is raised by throwing it immediately
198            throw cause;
199        } else {
200            Log.logTrace("Exchange: request [{0}/timeout={1}ms] no impl is set."
201                         + "\n\tCan''t cancel yet with {2}",
202                         request.uri(),
203                         request.duration() == null ? -1 :
204                         // calling duration.toMillis() can throw an exception.
205                         // this is just debugging, we don't care if it overflows.
206                         (request.duration().getSeconds() * 1000
207                          + request.duration().getNano() / 1000000),
208                         cause);
209        }
210    }
211
212    public void h2Upgrade() {
213        upgrading = true;
214        request.setH2Upgrade(client.client2());
215    }
216
217    static final SocketPermission[] SOCKET_ARRAY = new SocketPermission[0];
218
219    Response responseImpl(HttpConnection connection)
220        throws IOException, InterruptedException
221    {
222        SecurityException e = securityCheck(acc);
223        if (e != null) {
224            throw e;
225        }
226
227        if (permissions.size() > 0) {
228            try {
229                return AccessController.doPrivileged(
230                        (PrivilegedExceptionAction<Response>)() ->
231                             responseImpl0(connection),
232                        null,
233                        permissions.toArray(SOCKET_ARRAY));
234            } catch (Throwable ee) {
235                if (ee instanceof PrivilegedActionException) {
236                    ee = ee.getCause();
237                }
238                if (ee instanceof IOException) {
239                    throw (IOException) ee;
240                } else {
241                    throw new RuntimeException(ee); // TODO: fix
242                }
243            }
244        } else {
245            return responseImpl0(connection);
246        }
247    }
248
249    // get/set the exchange impl, solving race condition issues with
250    // potential concurrent calls to cancel() or cancel(IOException)
251    private void establishExchange(HttpConnection connection)
252        throws IOException, InterruptedException
253    {
254        // check if we have been cancelled first.
255        checkCancelled(true);
256        // not yet cancelled: create/get a new impl
257        exchImpl = ExchangeImpl.get(this, connection);
258        // recheck for cancelled, in case of race conditions
259        checkCancelled(true);
260        // now we're good to go. because exchImpl is no longer null
261        // cancel() will be able to propagate directly to the impl
262        // after this point.
263    }
264
265    private Response responseImpl0(HttpConnection connection)
266        throws IOException, InterruptedException
267    {
268        establishExchange(connection);
269        exchImpl.setClientForRequest(requestProcessor);
270        if (request.expectContinue()) {
271            Log.logTrace("Sending Expect: 100-Continue");
272            request.addSystemHeader("Expect", "100-Continue");
273            exchImpl.sendHeadersOnly();
274
275            Log.logTrace("Waiting for 407-Expectation-Failed or 100-Continue");
276            Response resp = exchImpl.getResponse();
277            HttpResponseImpl.logResponse(resp);
278            int rcode = resp.statusCode();
279            if (rcode != 100) {
280                Log.logTrace("Expectation failed: Received {0}",
281                             rcode);
282                if (upgrading && rcode == 101) {
283                    throw new IOException(
284                        "Unable to handle 101 while waiting for 100-Continue");
285                }
286                return resp;
287            }
288
289            Log.logTrace("Received 100-Continue: sending body");
290            exchImpl.sendBody();
291
292            Log.logTrace("Body sent: waiting for response");
293            resp = exchImpl.getResponse();
294            HttpResponseImpl.logResponse(resp);
295
296            return checkForUpgrade(resp, exchImpl);
297        } else {
298            exchImpl.sendHeadersOnly();
299            exchImpl.sendBody();
300            Response resp = exchImpl.getResponse();
301            HttpResponseImpl.logResponse(resp);
302            return checkForUpgrade(resp, exchImpl);
303        }
304    }
305
306    // Completed HttpResponse will be null if response succeeded
307    // will be a non null responseAsync if expect continue returns an error
308
309    public CompletableFuture<Response> responseAsync() {
310        return responseAsyncImpl(null);
311    }
312
313    CompletableFuture<Response> responseAsyncImpl(HttpConnection connection) {
314        SecurityException e = securityCheck(acc);
315        if (e != null) {
316            return MinimalFuture.failedFuture(e);
317        }
318        if (permissions.size() > 0) {
319            return AccessController.doPrivileged(
320                    (PrivilegedAction<CompletableFuture<Response>>)() ->
321                        responseAsyncImpl0(connection),
322                    null,
323                    permissions.toArray(SOCKET_ARRAY));
324        } else {
325            return responseAsyncImpl0(connection);
326        }
327    }
328
329    CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {
330        try {
331            establishExchange(connection);
332        } catch (IOException | InterruptedException e) {
333            return MinimalFuture.failedFuture(e);
334        }
335        if (request.expectContinue()) {
336            request.addSystemHeader("Expect", "100-Continue");
337            Log.logTrace("Sending Expect: 100-Continue");
338            return exchImpl
339                    .sendHeadersAsync()
340                    .thenCompose(v -> exchImpl.getResponseAsync(parentExecutor))
341                    .thenCompose((Response r1) -> {
342                        HttpResponseImpl.logResponse(r1);
343                        int rcode = r1.statusCode();
344                        if (rcode == 100) {
345                            Log.logTrace("Received 100-Continue: sending body");
346                            CompletableFuture<Response> cf =
347                                    exchImpl.sendBodyAsync()
348                                            .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
349                            cf = wrapForUpgrade(cf);
350                            cf = wrapForLog(cf);
351                            return cf;
352                        } else {
353                            Log.logTrace("Expectation failed: Received {0}",
354                                         rcode);
355                            if (upgrading && rcode == 101) {
356                                IOException failed = new IOException(
357                                        "Unable to handle 101 while waiting for 100");
358                                return MinimalFuture.failedFuture(failed);
359                            }
360                            return exchImpl.readBodyAsync(this::ignoreBody, false, parentExecutor)
361                                  .thenApply(v ->  r1);
362                        }
363                    });
364        } else {
365            CompletableFuture<Response> cf = exchImpl
366                    .sendHeadersAsync()
367                    .thenCompose(ExchangeImpl::sendBodyAsync)
368                    .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
369            cf = wrapForUpgrade(cf);
370            cf = wrapForLog(cf);
371            return cf;
372        }
373    }
374
375    private CompletableFuture<Response> wrapForUpgrade(CompletableFuture<Response> cf) {
376        if (upgrading) {
377            return cf.thenCompose(r -> checkForUpgradeAsync(r, exchImpl));
378        }
379        return cf;
380    }
381
382    private CompletableFuture<Response> wrapForLog(CompletableFuture<Response> cf) {
383        if (Log.requests()) {
384            return cf.thenApply(response -> {
385                HttpResponseImpl.logResponse(response);
386                return response;
387            });
388        }
389        return cf;
390    }
391
392    HttpResponse.BodyProcessor<T> ignoreBody(int status, HttpHeaders hdrs) {
393        return HttpResponse.BodyProcessor.discard((T)null);
394    }
395
396    // if this response was received in reply to an upgrade
397    // then create the Http2Connection from the HttpConnection
398    // initialize it and wait for the real response on a newly created Stream
399
400    private CompletableFuture<Response>
401    checkForUpgradeAsync(Response resp,
402                         ExchangeImpl<T> ex) {
403
404        int rcode = resp.statusCode();
405        if (upgrading && (rcode == 101)) {
406            Http1Exchange<T> e = (Http1Exchange<T>)ex;
407            // check for 101 switching protocols
408            // 101 responses are not supposed to contain a body.
409            //    => should we fail if there is one?
410            return e.readBodyAsync(this::ignoreBody, false, parentExecutor)
411                .thenCompose((T v) -> // v is null
412                     Http2Connection.createAsync(e.connection(),
413                                                 client.client2(),
414                                                 this, e.getBuffer())
415                        .thenCompose((Http2Connection c) -> {
416                            c.putConnection();
417                            Stream<T> s = c.getStream(1);
418                            exchImpl = s;
419                            return s.getResponseAsync(null);
420                        })
421                );
422        }
423        return MinimalFuture.completedFuture(resp);
424    }
425
426    private Response checkForUpgrade(Response resp,
427                                             ExchangeImpl<T> ex)
428        throws IOException, InterruptedException
429    {
430        int rcode = resp.statusCode();
431        if (upgrading && (rcode == 101)) {
432            Http1Exchange<T> e = (Http1Exchange<T>) ex;
433
434            // 101 responses are not supposed to contain a body.
435            //    => should we fail if there is one?
436            //    => readBody called here by analogy with
437            //       checkForUpgradeAsync above
438            e.readBody(this::ignoreBody, false);
439
440            // must get connection from Http1Exchange
441            Http2Connection h2con = new Http2Connection(e.connection(),
442                                                        client.client2(),
443                                                        this, e.getBuffer());
444            h2con.putConnection();
445            Stream<T> s = h2con.getStream(1);
446            exchImpl = s;
447            Response xx = s.getResponse();
448            HttpResponseImpl.logResponse(xx);
449            return xx;
450        }
451        return resp;
452    }
453
454    private URI getURIForSecurityCheck() {
455        URI u;
456        String method = request.method();
457        InetSocketAddress authority = request.authority();
458        URI uri = request.uri();
459
460        // CONNECT should be restricted at API level
461        if (method.equalsIgnoreCase("CONNECT")) {
462            try {
463                u = new URI("socket",
464                             null,
465                             authority.getHostString(),
466                             authority.getPort(),
467                             null,
468                             null,
469                             null);
470            } catch (URISyntaxException e) {
471                throw new InternalError(e); // shouldn't happen
472            }
473        } else {
474            u = uri;
475        }
476        return u;
477    }
478
479    /**
480     * Do the security check and return any exception.
481     * Return null if no check needed or passes.
482     *
483     * Also adds any generated permissions to the "permissions" list.
484     */
485    private SecurityException securityCheck(AccessControlContext acc) {
486        SecurityManager sm = System.getSecurityManager();
487        if (sm == null) {
488            return null;
489        }
490
491        String method = request.method();
492        HttpHeaders userHeaders = request.getUserHeaders();
493        URI u = getURIForSecurityCheck();
494        URLPermission p = Utils.getPermission(u, method, userHeaders.map());
495
496        try {
497            assert acc != null;
498            sm.checkPermission(p, acc);
499            permissions.add(getSocketPermissionFor(u));
500        } catch (SecurityException e) {
501            return e;
502        }
503        ProxySelector ps = client.proxy().orElse(null);
504        if (ps != null) {
505            InetSocketAddress proxy = (InetSocketAddress)
506                    ps.select(u).get(0).address(); // TODO: check this
507            // may need additional check
508            if (!method.equals("CONNECT")) {
509                // a direct http proxy. Need to check access to proxy
510                try {
511                    u = new URI("socket", null, proxy.getHostString(),
512                        proxy.getPort(), null, null, null);
513                } catch (URISyntaxException e) {
514                    throw new InternalError(e); // shouldn't happen
515                }
516                p = new URLPermission(u.toString(), "CONNECT");
517                try {
518                    sm.checkPermission(p, acc);
519                } catch (SecurityException e) {
520                    permissions.clear();
521                    return e;
522                }
523                String sockperm = proxy.getHostString() +
524                        ":" + Integer.toString(proxy.getPort());
525
526                permissions.add(new SocketPermission(sockperm, "connect,resolve"));
527            }
528        }
529        return null;
530    }
531
532    HttpClient.Redirect followRedirects() {
533        return client.followRedirects();
534    }
535
536    HttpClient.Version version() {
537        return multi.version();
538    }
539
540    private static SocketPermission getSocketPermissionFor(URI url) {
541        if (System.getSecurityManager() == null) {
542            return null;
543        }
544
545        StringBuilder sb = new StringBuilder();
546        String host = url.getHost();
547        sb.append(host);
548        int port = url.getPort();
549        if (port == -1) {
550            String scheme = url.getScheme();
551            if ("http".equals(scheme)) {
552                sb.append(":80");
553            } else { // scheme must be https
554                sb.append(":443");
555            }
556        } else {
557            sb.append(':')
558              .append(Integer.toString(port));
559        }
560        String target = sb.toString();
561        return new SocketPermission(target, "connect");
562    }
563
564    AccessControlContext getAccessControlContext() {
565        return acc;
566    }
567}
568