1/*
2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package jdk.incubator.http;
27
28import java.io.IOException;
29import java.time.Duration;
30import java.util.List;
31import java.security.AccessControlContext;
32import java.security.AccessController;
33import java.util.concurrent.CompletableFuture;
34import java.util.concurrent.CompletionException;
35import java.util.concurrent.ExecutionException;
36import java.util.function.BiFunction;
37import java.util.concurrent.Executor;
38import java.util.function.UnaryOperator;
39
40import jdk.incubator.http.internal.common.Log;
41import jdk.incubator.http.internal.common.MinimalFuture;
42import jdk.incubator.http.internal.common.Pair;
43import jdk.incubator.http.internal.common.Utils;
44import static jdk.incubator.http.internal.common.Pair.pair;
45
46/**
47 * Encapsulates multiple Exchanges belonging to one HttpRequestImpl.
48 * - manages filters
49 * - retries due to filters.
50 * - I/O errors and most other exceptions get returned directly to user
51 *
52 * Creates a new Exchange for each request/response interaction
53 */
54class MultiExchange<U,T> {
55
56    private final HttpRequest userRequest; // the user request
57    private final HttpRequestImpl request; // a copy of the user request
58    final AccessControlContext acc;
59    final HttpClientImpl client;
60    final HttpResponse.BodyHandler<T> responseHandler;
61    final ExecutorWrapper execWrapper;
62    final Executor executor;
63    final HttpResponse.MultiProcessor<U,T> multiResponseHandler;
64    HttpRequestImpl currentreq; // used for async only
65    Exchange<T> exchange; // the current exchange
66    Exchange<T> previous;
67    int attempts;
68    // Maximum number of times a request will be retried/redirected
69    // for any reason
70
71    static final int DEFAULT_MAX_ATTEMPTS = 5;
72    static final int max_attempts = Utils.getIntegerNetProperty(
73            "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS
74    );
75
76    private final List<HeaderFilter> filters;
77    TimedEvent timedEvent;
78    volatile boolean cancelled;
79    final PushGroup<U,T> pushGroup;
80
81    /**
82     * Filter fields. These are attached as required by filters
83     * and only used by the filter implementations. This could be
84     * generalised into Objects that are passed explicitly to the filters
85     * (one per MultiExchange object, and one per Exchange object possibly)
86     */
87    volatile AuthenticationFilter.AuthInfo serverauth, proxyauth;
88    // RedirectHandler
89    volatile int numberOfRedirects = 0;
90
91    /**
92     * MultiExchange with one final response.
93     */
94    MultiExchange(HttpRequest req,
95                  HttpClientImpl client,
96                  HttpResponse.BodyHandler<T> responseHandler) {
97        this.previous = null;
98        this.userRequest = req;
99        this.request = new HttpRequestImpl(req);
100        this.currentreq = request;
101        this.attempts = 0;
102        this.client = client;
103        this.filters = client.filterChain();
104        if (System.getSecurityManager() != null) {
105            this.acc = AccessController.getContext();
106        } else {
107            this.acc = null;
108        }
109        this.execWrapper = new ExecutorWrapper(client.executor(), acc);
110        this.executor = execWrapper.executor();
111        this.responseHandler = responseHandler;
112        this.exchange = new Exchange<>(request, this);
113        this.multiResponseHandler = null;
114        this.pushGroup = null;
115    }
116
117    /**
118     * MultiExchange with multiple responses (HTTP/2 server pushes).
119     */
120    MultiExchange(HttpRequest req,
121                  HttpClientImpl client,
122                  HttpResponse.MultiProcessor<U, T> multiResponseHandler) {
123        this.previous = null;
124        this.userRequest = req;
125        this.request = new HttpRequestImpl(req);
126        this.currentreq = request;
127        this.attempts = 0;
128        this.client = client;
129        this.filters = client.filterChain();
130        if (System.getSecurityManager() != null) {
131            this.acc = AccessController.getContext();
132        } else {
133            this.acc = null;
134        }
135        this.execWrapper = new ExecutorWrapper(client.executor(), acc);
136        this.executor = execWrapper.executor();
137        this.multiResponseHandler = multiResponseHandler;
138        this.pushGroup = new PushGroup<>(multiResponseHandler, request);
139        this.exchange = new Exchange<>(request, this);
140        this.responseHandler = pushGroup.mainResponseHandler();
141    }
142
143    public HttpResponseImpl<T> response() throws IOException, InterruptedException {
144        HttpRequestImpl r = request;
145        if (r.duration() != null) {
146            timedEvent = new TimedEvent(r.duration());
147            client.registerTimer(timedEvent);
148        }
149        while (attempts < max_attempts) {
150            try {
151                attempts++;
152                Exchange<T> currExchange = getExchange();
153                requestFilters(r);
154                Response response = currExchange.response();
155                HttpRequestImpl newreq = responseFilters(response);
156                if (newreq == null) {
157                    if (attempts > 1) {
158                        Log.logError("Succeeded on attempt: " + attempts);
159                    }
160                    T body = currExchange.readBody(responseHandler);
161                    cancelTimer();
162                    return new HttpResponseImpl<>(userRequest, response, body, currExchange);
163                }
164                //response.body(HttpResponse.ignoreBody());
165                setExchange(new Exchange<>(newreq, this, acc));
166                r = newreq;
167            } catch (IOException e) {
168                if (cancelled) {
169                    throw new HttpTimeoutException("Request timed out");
170                }
171                throw e;
172            }
173        }
174        cancelTimer();
175        throw new IOException("Retry limit exceeded");
176    }
177
178    CompletableFuture<Void> multiCompletionCF() {
179        return pushGroup.groupResult();
180    }
181
182    private synchronized Exchange<T> getExchange() {
183        return exchange;
184    }
185
186    HttpClientImpl client() {
187        return client;
188    }
189
190    HttpClient.Redirect followRedirects() {
191        return client.followRedirects();
192    }
193
194    HttpClient.Version version() {
195        return request.version().orElse(client.version());
196    }
197
198    private synchronized void setExchange(Exchange<T> exchange) {
199        this.exchange = exchange;
200    }
201
202    private void cancelTimer() {
203        if (timedEvent != null) {
204            client.cancelTimer(timedEvent);
205        }
206    }
207
208    private void requestFilters(HttpRequestImpl r) throws IOException {
209        Log.logTrace("Applying request filters");
210        for (HeaderFilter filter : filters) {
211            Log.logTrace("Applying {0}", filter);
212            filter.request(r, this);
213        }
214        Log.logTrace("All filters applied");
215    }
216
217    private HttpRequestImpl responseFilters(Response response) throws IOException
218    {
219        Log.logTrace("Applying response filters");
220        for (HeaderFilter filter : filters) {
221            Log.logTrace("Applying {0}", filter);
222            HttpRequestImpl newreq = filter.response(response);
223            if (newreq != null) {
224                Log.logTrace("New request: stopping filters");
225                return newreq;
226            }
227        }
228        Log.logTrace("All filters applied");
229        return null;
230    }
231
232    public void cancel() {
233        cancelled = true;
234        getExchange().cancel();
235    }
236
237    public void cancel(IOException cause) {
238        cancelled = true;
239        getExchange().cancel(cause);
240    }
241
242    public CompletableFuture<HttpResponseImpl<T>> responseAsync() {
243        CompletableFuture<Void> start = new MinimalFuture<>();
244        CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
245        start.completeAsync( () -> null, executor); // trigger execution
246        return cf;
247    }
248
249    private CompletableFuture<HttpResponseImpl<T>> responseAsync0(CompletableFuture<Void> start) {
250        return start.thenCompose( v -> responseAsyncImpl())
251            .thenCompose((Response r) -> {
252                Exchange<T> exch = getExchange();
253                return exch.readBodyAsync(responseHandler)
254                        .thenApply((T body) ->  new HttpResponseImpl<>(userRequest, r, body, exch));
255            });
256    }
257
258    CompletableFuture<U> multiResponseAsync() {
259        CompletableFuture<Void> start = new MinimalFuture<>();
260        CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
261        CompletableFuture<HttpResponse<T>> mainResponse =
262                cf.thenApply((HttpResponseImpl<T> b) -> {
263                      multiResponseHandler.onResponse(b);
264                      return (HttpResponse<T>)b;
265                   });
266
267        pushGroup.setMainResponse(mainResponse);
268        // set up house-keeping related to multi-response
269        mainResponse.thenAccept((r) -> {
270            // All push promises received by now.
271            pushGroup.noMorePushes(true);
272        });
273        CompletableFuture<U> res = multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF());
274        start.completeAsync( () -> null, executor); // trigger execution
275        return res;
276    }
277
278    private CompletableFuture<Response> responseAsyncImpl() {
279        CompletableFuture<Response> cf;
280        if (++attempts > max_attempts) {
281            cf = MinimalFuture.failedFuture(new IOException("Too many retries"));
282        } else {
283            if (currentreq.duration() != null) {
284                timedEvent = new TimedEvent(currentreq.duration());
285                client.registerTimer(timedEvent);
286            }
287            try {
288                // 1. Apply request filters
289                requestFilters(currentreq);
290            } catch (IOException e) {
291                return MinimalFuture.failedFuture(e);
292            }
293            Exchange<T> exch = getExchange();
294            // 2. get response
295            cf = exch.responseAsync()
296                .thenCompose((Response response) -> {
297                    HttpRequestImpl newrequest = null;
298                    try {
299                        // 3. Apply response filters
300                        newrequest = responseFilters(response);
301                    } catch (IOException e) {
302                        return MinimalFuture.failedFuture(e);
303                    }
304                    // 4. Check filter result and repeat or continue
305                    if (newrequest == null) {
306                        if (attempts > 1) {
307                            Log.logError("Succeeded on attempt: " + attempts);
308                        }
309                        return MinimalFuture.completedFuture(response);
310                    } else {
311                        currentreq = newrequest;
312                        setExchange(new Exchange<>(currentreq, this, acc));
313                        //reads body off previous, and then waits for next response
314                        return responseAsyncImpl();
315                    }
316                })
317            // 5. Handle errors and cancel any timer set
318            .handle((response, ex) -> {
319                cancelTimer();
320                if (ex == null) {
321                    assert response != null;
322                    return MinimalFuture.completedFuture(response);
323                }
324                // all exceptions thrown are handled here
325                CompletableFuture<Response> error = getExceptionalCF(ex);
326                if (error == null) {
327                    return responseAsyncImpl();
328                } else {
329                    return error;
330                }
331            })
332            .thenCompose(UnaryOperator.identity());
333        }
334        return cf;
335    }
336
337    /**
338     * Take a Throwable and return a suitable CompletableFuture that is
339     * completed exceptionally.
340     */
341    private CompletableFuture<Response> getExceptionalCF(Throwable t) {
342        if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
343            if (t.getCause() != null) {
344                t = t.getCause();
345            }
346        }
347        if (cancelled && t instanceof IOException) {
348            t = new HttpTimeoutException("request timed out");
349        }
350        return MinimalFuture.failedFuture(t);
351    }
352
353    class TimedEvent extends TimeoutEvent {
354        TimedEvent(Duration duration) {
355            super(duration);
356        }
357        @Override
358        public void handle() {
359            cancel(new HttpTimeoutException("request timed out"));
360        }
361    }
362}
363