1/*
2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package jdk.incubator.http;
27
28import javax.net.ssl.SSLContext;
29import javax.net.ssl.SSLParameters;
30import java.io.IOException;
31import java.lang.ref.WeakReference;
32import java.net.Authenticator;
33import java.net.CookieManager;
34import java.net.ProxySelector;
35import java.net.URI;
36import java.nio.channels.ClosedChannelException;
37import java.nio.channels.SelectableChannel;
38import java.nio.channels.SelectionKey;
39import java.nio.channels.Selector;
40import java.nio.channels.SocketChannel;
41import java.security.NoSuchAlgorithmException;
42import java.time.Instant;
43import java.time.temporal.ChronoUnit;
44import java.util.ArrayList;
45import java.util.Iterator;
46import java.util.List;
47import java.util.Optional;
48import java.util.Set;
49import java.util.TreeSet;
50import java.util.concurrent.CompletableFuture;
51import java.util.concurrent.Executor;
52import java.util.concurrent.Executors;
53import java.util.concurrent.ThreadFactory;
54import java.util.stream.Stream;
55import jdk.incubator.http.internal.common.Log;
56import jdk.incubator.http.internal.common.Utils;
57import jdk.incubator.http.internal.websocket.BuilderImpl;
58
59/**
60 * Client implementation. Contains all configuration information and also
61 * the selector manager thread which allows async events to be registered
62 * and delivered when they occur. See AsyncEvent.
63 */
64class HttpClientImpl extends HttpClient {
65
66    // Define the default factory as a static inner class
67    // that embeds all the necessary logic to avoid
68    // the risk of using a lambda that might keep a reference on the
69    // HttpClient instance from which it was created (helps with
70    // heapdump analysis).
71    private static final class DefaultThreadFactory implements ThreadFactory {
72        private DefaultThreadFactory() {}
73        @Override
74        public Thread newThread(Runnable r) {
75            Thread t = new Thread(null, r, "HttpClient_worker", 0, true);
76            t.setDaemon(true);
77            return t;
78        }
79        static final ThreadFactory INSTANCE = new DefaultThreadFactory();
80    }
81
82    private final CookieManager cookieManager;
83    private final Redirect followRedirects;
84    private final ProxySelector proxySelector;
85    private final Authenticator authenticator;
86    private final Version version;
87    private final ConnectionPool connections;
88    private final Executor executor;
89    // Security parameters
90    private final SSLContext sslContext;
91    private final SSLParameters sslParams;
92    private final SelectorManager selmgr;
93    private final FilterFactory filters;
94    private final Http2ClientImpl client2;
95
96    /** A Set of, deadline first, ordered timeout events. */
97    private final TreeSet<TimeoutEvent> timeouts;
98
99    public static HttpClientImpl create(HttpClientBuilderImpl builder) {
100        HttpClientImpl impl = new HttpClientImpl(builder);
101        impl.start();
102        return impl;
103    }
104
105    private HttpClientImpl(HttpClientBuilderImpl builder) {
106        if (builder.sslContext == null) {
107            try {
108                sslContext = SSLContext.getDefault();
109            } catch (NoSuchAlgorithmException ex) {
110                throw new InternalError(ex);
111            }
112        } else {
113            sslContext = builder.sslContext;
114        }
115        Executor ex = builder.executor;
116        if (ex == null) {
117            ex = Executors.newCachedThreadPool(DefaultThreadFactory.INSTANCE);
118        } else {
119            ex = builder.executor;
120        }
121        client2 = new Http2ClientImpl(this);
122        executor = ex;
123        cookieManager = builder.cookieManager;
124        followRedirects = builder.followRedirects == null ?
125                Redirect.NEVER : builder.followRedirects;
126        this.proxySelector = builder.proxy;
127        authenticator = builder.authenticator;
128        if (builder.version == null) {
129            version = HttpClient.Version.HTTP_2;
130        } else {
131            version = builder.version;
132        }
133        if (builder.sslParams == null) {
134            sslParams = getDefaultParams(sslContext);
135        } else {
136            sslParams = builder.sslParams;
137        }
138        connections = new ConnectionPool();
139        connections.start();
140        timeouts = new TreeSet<>();
141        try {
142            selmgr = new SelectorManager(this);
143        } catch (IOException e) {
144            // unlikely
145            throw new InternalError(e);
146        }
147        selmgr.setDaemon(true);
148        filters = new FilterFactory();
149        initFilters();
150    }
151
152    private void start() {
153        selmgr.start();
154    }
155
156    private static SSLParameters getDefaultParams(SSLContext ctx) {
157        SSLParameters params = ctx.getSupportedSSLParameters();
158        params.setProtocols(new String[]{"TLSv1.2"});
159        return params;
160    }
161
162    /**
163     * Wait for activity on given exchange (assuming blocking = false).
164     * It's a no-op if blocking = true. In particular, the following occurs
165     * in the SelectorManager thread.
166     *
167     *  1) mark the connection non-blocking
168     *  2) add to selector
169     *  3) If selector fires for this exchange then
170     *  4)   - mark connection as blocking
171     *  5)   - call AsyncEvent.handle()
172     *
173     * If exchange needs to block again, then call registerEvent() again
174     */
175    void registerEvent(AsyncEvent exchange) throws IOException {
176        selmgr.register(exchange);
177    }
178
179    /**
180     * Only used from RawChannel to disconnect the channel from
181     * the selector
182     */
183    void cancelRegistration(SocketChannel s) {
184        selmgr.cancel(s);
185    }
186
187
188    Http2ClientImpl client2() {
189        return client2;
190    }
191
192    /*
193    @Override
194    public ByteBuffer getBuffer() {
195        return pool.getBuffer();
196    }
197
198    // SSL buffers are larger. Manage separately
199
200    int size = 16 * 1024;
201
202    ByteBuffer getSSLBuffer() {
203        return ByteBuffer.allocate(size);
204    }
205
206    /**
207     * Return a new buffer that's a bit bigger than the given one
208     *
209     * @param buf
210     * @return
211     *
212    ByteBuffer reallocSSLBuffer(ByteBuffer buf) {
213        size = buf.capacity() * 12 / 10; // 20% bigger
214        return ByteBuffer.allocate(size);
215    }
216
217    synchronized void returnSSLBuffer(ByteBuffer buf) {
218        if (buf.capacity() >= size)
219           sslBuffers.add(0, buf);
220    }
221
222    @Override
223    public void returnBuffer(ByteBuffer buffer) {
224        pool.returnBuffer(buffer);
225    }
226    */
227
228    @Override
229    public <T> HttpResponse<T>
230    send(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler)
231        throws IOException, InterruptedException
232    {
233        MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler);
234        return mex.response();
235    }
236
237    @Override
238    public <T> CompletableFuture<HttpResponse<T>>
239    sendAsync(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler)
240    {
241        MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler);
242        return mex.responseAsync()
243                  .thenApply((HttpResponseImpl<T> b) -> (HttpResponse<T>) b);
244    }
245
246    @Override
247    public <U, T> CompletableFuture<U>
248    sendAsync(HttpRequest req, HttpResponse.MultiProcessor<U, T> responseHandler) {
249        MultiExchange<U,T> mex = new MultiExchange<>(req, this, responseHandler);
250        return mex.multiResponseAsync();
251    }
252
253    // new impl. Should get rid of above
254    /*
255    static class BufferPool implements BufferHandler {
256
257        final LinkedList<ByteBuffer> freelist = new LinkedList<>();
258
259        @Override
260        public synchronized ByteBuffer getBuffer() {
261            ByteBuffer buf;
262
263            while (!freelist.isEmpty()) {
264                buf = freelist.removeFirst();
265                buf.clear();
266                return buf;
267            }
268            return ByteBuffer.allocate(BUFSIZE);
269        }
270
271        @Override
272        public synchronized void returnBuffer(ByteBuffer buffer) {
273            assert buffer.capacity() > 0;
274            freelist.add(buffer);
275        }
276    }
277
278    static BufferPool pool = new BufferPool();
279
280    static BufferHandler pool() {
281        return pool;
282    }
283*/
284    // Main loop for this client's selector
285    private final static class SelectorManager extends Thread {
286
287        private static final long NODEADLINE = 3000L;
288        private final Selector selector;
289        private volatile boolean closed;
290        private final List<AsyncEvent> readyList;
291        private final List<AsyncEvent> registrations;
292
293        // Uses a weak reference to the HttpClient owning this
294        // selector: a strong reference prevents its garbage
295        // collection while the thread is running.
296        // We want the thread to exit gracefully when the
297        // HttpClient that owns it gets GC'ed.
298        WeakReference<HttpClientImpl> ownerRef;
299
300        SelectorManager(HttpClientImpl ref) throws IOException {
301            super(null, null, "SelectorManager", 0, false);
302            ownerRef = new WeakReference<>(ref);
303            readyList = new ArrayList<>();
304            registrations = new ArrayList<>();
305            selector = Selector.open();
306        }
307
308        // This returns immediately. So caller not allowed to send/receive
309        // on connection.
310
311        synchronized void register(AsyncEvent e) throws IOException {
312            registrations.add(e);
313            selector.wakeup();
314        }
315
316        synchronized void cancel(SocketChannel e) {
317            SelectionKey key = e.keyFor(selector);
318            if (key != null) {
319                key.cancel();
320            }
321            selector.wakeup();
322        }
323
324        void wakeupSelector() {
325            selector.wakeup();
326        }
327
328        synchronized void shutdown() {
329            closed = true;
330            try {
331                selector.close();
332            } catch (IOException ignored) { }
333        }
334
335        @Override
336        public void run() {
337            try {
338                while (!Thread.currentThread().isInterrupted()) {
339                    HttpClientImpl client;
340                    synchronized (this) {
341                        for (AsyncEvent exchange : registrations) {
342                            SelectableChannel c = exchange.channel();
343                            try {
344                                c.configureBlocking(false);
345                                SelectionKey key = c.keyFor(selector);
346                                SelectorAttachment sa;
347                                if (key == null || !key.isValid()) {
348                                    if (key != null) {
349                                        // key is canceled.
350                                        // invoke selectNow() to purge it
351                                        // before registering the new event.
352                                        selector.selectNow();
353                                    }
354                                    sa = new SelectorAttachment(c, selector);
355                                } else {
356                                    sa = (SelectorAttachment) key.attachment();
357                                }
358                                sa.register(exchange);
359                            } catch (IOException e) {
360                                Log.logError("HttpClientImpl: " + e);
361                                c.close();
362                                // let the exchange deal with it
363                                handleEvent(exchange);
364                            }
365                        }
366                        registrations.clear();
367                    }
368
369                    // Check whether client is still alive, and if not,
370                    // gracefully stop this thread
371                    if ((client = ownerRef.get()) == null) {
372                        Log.logTrace("HttpClient no longer referenced. Exiting...");
373                        return;
374                    }
375                    long millis = client.purgeTimeoutsAndReturnNextDeadline();
376                    client = null; // don't hold onto the client ref
377
378                    //debugPrint(selector);
379                    // Don't wait for ever as it might prevent the thread to
380                    // stop gracefully. millis will be 0 if no deadline was found.
381                    int n = selector.select(millis == 0 ? NODEADLINE : millis);
382                    if (n == 0) {
383                        // Check whether client is still alive, and if not,
384                        // gracefully stop this thread
385                        if ((client = ownerRef.get()) == null) {
386                            Log.logTrace("HttpClient no longer referenced. Exiting...");
387                            return;
388                        }
389                        client.purgeTimeoutsAndReturnNextDeadline();
390                        client = null; // don't hold onto the client ref
391                        continue;
392                    }
393                    Set<SelectionKey> keys = selector.selectedKeys();
394
395                    for (SelectionKey key : keys) {
396                        SelectorAttachment sa = (SelectorAttachment) key.attachment();
397                        int eventsOccurred = key.readyOps();
398                        sa.events(eventsOccurred).forEach(readyList::add);
399                        sa.resetInterestOps(eventsOccurred);
400                    }
401                    selector.selectNow(); // complete cancellation
402                    selector.selectedKeys().clear();
403
404                    for (AsyncEvent exchange : readyList) {
405                        if (exchange.blocking()) {
406                            exchange.channel().configureBlocking(true);
407                        }
408                        handleEvent(exchange); // will be delegated to executor
409                    }
410                    readyList.clear();
411                }
412            } catch (Throwable e) {
413                if (!closed) {
414                    // This terminates thread. So, better just print stack trace
415                    String err = Utils.stackTrace(e);
416                    Log.logError("HttpClientImpl: fatal error: " + err);
417                }
418            } finally {
419                shutdown();
420            }
421        }
422
423        void debugPrint(Selector selector) {
424            System.err.println("Selector: debugprint start");
425            Set<SelectionKey> keys = selector.keys();
426            for (SelectionKey key : keys) {
427                SelectableChannel c = key.channel();
428                int ops = key.interestOps();
429                System.err.printf("selector chan:%s ops:%d\n", c, ops);
430            }
431            System.err.println("Selector: debugprint end");
432        }
433
434        void handleEvent(AsyncEvent e) {
435            if (closed) {
436                e.abort();
437            } else {
438                e.handle();
439            }
440        }
441    }
442
443    /**
444     * Tracks multiple user level registrations associated with one NIO
445     * registration (SelectionKey). In this implementation, registrations
446     * are one-off and when an event is posted the registration is cancelled
447     * until explicitly registered again.
448     *
449     * <p> No external synchronization required as this class is only used
450     * by the SelectorManager thread. One of these objects required per
451     * connection.
452     */
453    private static class SelectorAttachment {
454        private final SelectableChannel chan;
455        private final Selector selector;
456        private final ArrayList<AsyncEvent> pending;
457        private int interestOps;
458
459        SelectorAttachment(SelectableChannel chan, Selector selector) {
460            this.pending = new ArrayList<>();
461            this.chan = chan;
462            this.selector = selector;
463        }
464
465        void register(AsyncEvent e) throws ClosedChannelException {
466            int newOps = e.interestOps();
467            boolean reRegister = (interestOps & newOps) != newOps;
468            interestOps |= newOps;
469            pending.add(e);
470            if (reRegister) {
471                // first time registration happens here also
472                chan.register(selector, interestOps, this);
473            }
474        }
475
476        /**
477         * Returns a Stream<AsyncEvents> containing only events that are
478         * registered with the given {@code interestOps}.
479         */
480        Stream<AsyncEvent> events(int interestOps) {
481            return pending.stream()
482                    .filter(ev -> (ev.interestOps() & interestOps) != 0);
483        }
484
485        /**
486         * Removes any events with the given {@code interestOps}, and if no
487         * events remaining, cancels the associated SelectionKey.
488         */
489        void resetInterestOps(int interestOps) {
490            int newOps = 0;
491
492            Iterator<AsyncEvent> itr = pending.iterator();
493            while (itr.hasNext()) {
494                AsyncEvent event = itr.next();
495                int evops = event.interestOps();
496                if (event.repeating()) {
497                    newOps |= evops;
498                    continue;
499                }
500                if ((evops & interestOps) != 0) {
501                    itr.remove();
502                } else {
503                    newOps |= evops;
504                }
505            }
506
507            this.interestOps = newOps;
508            SelectionKey key = chan.keyFor(selector);
509            if (newOps == 0) {
510                key.cancel();
511            } else {
512                key.interestOps(newOps);
513            }
514        }
515    }
516
517    @Override
518    public SSLContext sslContext() {
519        Utils.checkNetPermission("getSSLContext");
520        return sslContext;
521    }
522
523    @Override
524    public Optional<SSLParameters> sslParameters() {
525        return Optional.ofNullable(sslParams);
526    }
527
528    @Override
529    public Optional<Authenticator> authenticator() {
530        return Optional.ofNullable(authenticator);
531    }
532
533    @Override
534    public Executor executor() {
535        return executor;
536    }
537
538    ConnectionPool connectionPool() {
539        return connections;
540    }
541
542    @Override
543    public Redirect followRedirects() {
544        return followRedirects;
545    }
546
547
548    @Override
549    public Optional<CookieManager> cookieManager() {
550        return Optional.ofNullable(cookieManager);
551    }
552
553    @Override
554    public Optional<ProxySelector> proxy() {
555        return Optional.ofNullable(this.proxySelector);
556    }
557
558    @Override
559    public WebSocket.Builder newWebSocketBuilder(URI uri,
560                                                 WebSocket.Listener listener) {
561        return new BuilderImpl(this, uri, listener);
562    }
563
564    @Override
565    public Version version() {
566        return version;
567    }
568
569    //private final HashMap<String, Boolean> http2NotSupported = new HashMap<>();
570
571    boolean getHttp2Allowed() {
572        return version.equals(Version.HTTP_2);
573    }
574
575    private void initFilters() {
576        addFilter(AuthenticationFilter.class);
577        addFilter(RedirectFilter.class);
578        if (this.cookieManager != null) {
579            addFilter(CookieFilter.class);
580        }
581    }
582
583    private void addFilter(Class<? extends HeaderFilter> f) {
584        filters.addFilter(f);
585    }
586
587    final List<HeaderFilter> filterChain() {
588        return filters.getFilterChain();
589    }
590
591    // Timer controls.
592    // Timers are implemented through timed Selector.select() calls.
593
594    synchronized void registerTimer(TimeoutEvent event) {
595        Log.logTrace("Registering timer {0}", event);
596        timeouts.add(event);
597        selmgr.wakeupSelector();
598    }
599
600    synchronized void cancelTimer(TimeoutEvent event) {
601        Log.logTrace("Canceling timer {0}", event);
602        timeouts.remove(event);
603    }
604
605    /**
606     * Purges ( handles ) timer events that have passed their deadline, and
607     * returns the amount of time, in milliseconds, until the next earliest
608     * event. A return value of 0 means that there are no events.
609     */
610    private long purgeTimeoutsAndReturnNextDeadline() {
611        long diff = 0L;
612        List<TimeoutEvent> toHandle = null;
613        int remaining = 0;
614        // enter critical section to retrieve the timeout event to handle
615        synchronized(this) {
616            if (timeouts.isEmpty()) return 0L;
617
618            Instant now = Instant.now();
619            Iterator<TimeoutEvent> itr = timeouts.iterator();
620            while (itr.hasNext()) {
621                TimeoutEvent event = itr.next();
622                diff = now.until(event.deadline(), ChronoUnit.MILLIS);
623                if (diff <= 0) {
624                    itr.remove();
625                    toHandle = (toHandle == null) ? new ArrayList<>() : toHandle;
626                    toHandle.add(event);
627                } else {
628                    break;
629                }
630            }
631            remaining = timeouts.size();
632        }
633
634        // can be useful for debugging
635        if (toHandle != null && Log.trace()) {
636            Log.logTrace("purgeTimeoutsAndReturnNextDeadline: handling "
637                    + (toHandle == null ? 0 : toHandle.size()) + " events, "
638                    + "remaining " + remaining
639                    + ", next deadline: " + (diff < 0 ? 0L : diff));
640        }
641
642        // handle timeout events out of critical section
643        if (toHandle != null) {
644            Throwable failed = null;
645            for (TimeoutEvent event : toHandle) {
646                try {
647                   Log.logTrace("Firing timer {0}", event);
648                   event.handle();
649                } catch (Error | RuntimeException e) {
650                    // Not expected. Handle remaining events then throw...
651                    // If e is an OOME or SOE it might simply trigger a new
652                    // error from here - but in this case there's not much we
653                    // could do anyway. Just let it flow...
654                    if (failed == null) failed = e;
655                    else failed.addSuppressed(e);
656                    Log.logTrace("Failed to handle event {0}: {1}", event, e);
657                }
658            }
659            if (failed instanceof Error) throw (Error) failed;
660            if (failed instanceof RuntimeException) throw (RuntimeException) failed;
661        }
662
663        // return time to wait until next event. 0L if there's no more events.
664        return diff < 0 ? 0L : diff;
665    }
666
667    // used for the connection window
668    int getReceiveBufferSize() {
669        return Utils.getIntegerNetProperty(
670                "jdk.httpclient.connectionWindowSize", 256 * 1024
671        );
672    }
673}
674