1/*
2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package jdk.incubator.http;
27
28import java.io.IOException;
29import java.net.InetSocketAddress;
30import java.net.StandardSocketOptions;
31import java.nio.ByteBuffer;
32import java.nio.channels.SelectableChannel;
33import java.nio.channels.SelectionKey;
34import java.nio.channels.SocketChannel;
35import java.util.concurrent.CompletableFuture;
36import java.util.function.Consumer;
37import java.util.function.Supplier;
38
39import jdk.incubator.http.internal.common.AsyncWriteQueue;
40import jdk.incubator.http.internal.common.ByteBufferReference;
41import jdk.incubator.http.internal.common.Log;
42import jdk.incubator.http.internal.common.MinimalFuture;
43import jdk.incubator.http.internal.common.Utils;
44
45/**
46 * Plain raw TCP connection direct to destination. 2 modes
47 * 1) Blocking used by http/1. In this case the connect is actually non
48 *    blocking but the request is sent blocking. The first byte of a response
49 *    is received non-blocking and the remainder of the response is received
50 *    blocking
51 * 2) Non-blocking. In this case (for http/2) the connection is actually opened
52 *    blocking but all reads and writes are done non-blocking under the
53 *    control of a Http2Connection object.
54 */
55class PlainHttpConnection extends HttpConnection implements AsyncConnection {
56
57    protected final SocketChannel chan;
58    private volatile boolean connected;
59    private boolean closed;
60
61    // should be volatile to provide proper synchronization(visibility) action
62    private volatile Consumer<ByteBufferReference> asyncReceiver;
63    private volatile Consumer<Throwable> errorReceiver;
64    private volatile Supplier<ByteBufferReference> readBufferSupplier;
65    private boolean asyncReading;
66
67    private final AsyncWriteQueue asyncOutputQ = new AsyncWriteQueue(this::asyncOutput);
68
69    private final Object reading = new Object();
70
71    @Override
72    public void startReading() {
73        try {
74            synchronized(reading) {
75                asyncReading = true;
76            }
77            client.registerEvent(new ReadEvent());
78        } catch (IOException e) {
79            shutdown();
80        }
81    }
82
83    @Override
84    public void stopAsyncReading() {
85        synchronized(reading) {
86            asyncReading = false;
87        }
88        client.cancelRegistration(chan);
89    }
90
91    class ConnectEvent extends AsyncEvent {
92        CompletableFuture<Void> cf;
93
94        ConnectEvent(CompletableFuture<Void> cf) {
95            super(AsyncEvent.BLOCKING);
96            this.cf = cf;
97        }
98
99        @Override
100        public SelectableChannel channel() {
101            return chan;
102        }
103
104        @Override
105        public int interestOps() {
106            return SelectionKey.OP_CONNECT;
107        }
108
109        @Override
110        public void handle() {
111            try {
112                chan.finishConnect();
113            } catch (IOException e) {
114                cf.completeExceptionally(e);
115                return;
116            }
117            connected = true;
118            cf.complete(null);
119        }
120
121        @Override
122        public void abort() {
123            close();
124        }
125    }
126
127    @Override
128    public CompletableFuture<Void> connectAsync() {
129        CompletableFuture<Void> plainFuture = new MinimalFuture<>();
130        try {
131            chan.configureBlocking(false);
132            chan.connect(address);
133            client.registerEvent(new ConnectEvent(plainFuture));
134        } catch (IOException e) {
135            plainFuture.completeExceptionally(e);
136        }
137        return plainFuture;
138    }
139
140    @Override
141    public void connect() throws IOException {
142        chan.connect(address);
143        connected = true;
144    }
145
146    @Override
147    SocketChannel channel() {
148        return chan;
149    }
150
151    PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) {
152        super(addr, client);
153        try {
154            this.chan = SocketChannel.open();
155            int bufsize = client.getReceiveBufferSize();
156            chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize);
157            chan.setOption(StandardSocketOptions.TCP_NODELAY, true);
158        } catch (IOException e) {
159            throw new InternalError(e);
160        }
161    }
162
163    @Override
164    long write(ByteBuffer[] buffers, int start, int number) throws IOException {
165        if (getMode() != Mode.ASYNC) {
166            return chan.write(buffers, start, number);
167        }
168        // async
169        buffers = Utils.reduce(buffers, start, number);
170        long n = Utils.remaining(buffers);
171        asyncOutputQ.put(ByteBufferReference.toReferences(buffers));
172        flushAsync();
173        return n;
174    }
175
176    @Override
177    long write(ByteBuffer buffer) throws IOException {
178        if (getMode() != Mode.ASYNC) {
179            return chan.write(buffer);
180        }
181        // async
182        long n = buffer.remaining();
183        asyncOutputQ.put(ByteBufferReference.toReferences(buffer));
184        flushAsync();
185        return n;
186    }
187
188    // handle registered WriteEvent; invoked from SelectorManager thread
189    void flushRegistered() {
190        if (getMode() == Mode.ASYNC) {
191            try {
192                asyncOutputQ.flushDelayed();
193            } catch (IOException e) {
194                // Only IOException caused by closed Queue is expected here
195                shutdown();
196            }
197        }
198    }
199
200    @Override
201    public void writeAsync(ByteBufferReference[] buffers) throws IOException {
202        if (getMode() != Mode.ASYNC) {
203            chan.write(ByteBufferReference.toBuffers(buffers));
204            ByteBufferReference.clear(buffers);
205        } else {
206            asyncOutputQ.put(buffers);
207        }
208    }
209
210    @Override
211    public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
212        if (getMode() != Mode.ASYNC) {
213            chan.write(ByteBufferReference.toBuffers(buffers));
214            ByteBufferReference.clear(buffers);
215        } else {
216            // Unordered frames are sent before existing frames.
217            asyncOutputQ.putFirst(buffers);
218        }
219    }
220
221    @Override
222    public void flushAsync() throws IOException {
223        if (getMode() == Mode.ASYNC) {
224            asyncOutputQ.flush();
225        }
226    }
227
228    @Override
229    public void enableCallback() {
230        // not used
231        assert false;
232    }
233
234    boolean asyncOutput(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) {
235        try {
236            ByteBuffer[] bufs = ByteBufferReference.toBuffers(refs);
237            while (Utils.remaining(bufs) > 0) {
238                long n = chan.write(bufs);
239                if (n == 0) {
240                    delayCallback.setDelayed(refs);
241                    client.registerEvent(new WriteEvent());
242                    return false;
243                }
244            }
245            ByteBufferReference.clear(refs);
246        } catch (IOException e) {
247            shutdown();
248        }
249        return true;
250    }
251
252    @Override
253    public String toString() {
254        return "PlainHttpConnection: " + super.toString();
255    }
256
257    /**
258     * Close this connection
259     */
260    @Override
261    public synchronized void close() {
262        if (closed) {
263            return;
264        }
265        closed = true;
266        try {
267            Log.logError("Closing: " + toString());
268            chan.close();
269        } catch (IOException e) {}
270    }
271
272    @Override
273    void shutdownInput() throws IOException {
274        chan.shutdownInput();
275    }
276
277    @Override
278    void shutdownOutput() throws IOException {
279        chan.shutdownOutput();
280    }
281
282    void shutdown() {
283        close();
284        errorReceiver.accept(new IOException("Connection aborted"));
285    }
286
287    void asyncRead() {
288        synchronized (reading) {
289            try {
290                while (asyncReading) {
291                    ByteBufferReference buf = readBufferSupplier.get();
292                    int n = chan.read(buf.get());
293                    if (n == -1) {
294                        throw new IOException();
295                    }
296                    if (n == 0) {
297                        buf.clear();
298                        return;
299                    }
300                    buf.get().flip();
301                    asyncReceiver.accept(buf);
302                }
303            } catch (IOException e) {
304                shutdown();
305            }
306        }
307    }
308
309    @Override
310    protected ByteBuffer readImpl() throws IOException {
311        ByteBuffer dst = ByteBuffer.allocate(8192);
312        int n = readImpl(dst);
313        if (n > 0) {
314            return dst;
315        } else if (n == 0) {
316            return Utils.EMPTY_BYTEBUFFER;
317        } else {
318            return null;
319        }
320    }
321
322    private int readImpl(ByteBuffer buf) throws IOException {
323        int mark = buf.position();
324        int n;
325        // FIXME: this hack works in conjunction with the corresponding change
326        // in jdk.incubator.http.RawChannel.registerEvent
327        //if ((n = buffer.remaining()) != 0) {
328            //buf.put(buffer);
329        //} else {
330            n = chan.read(buf);
331        //}
332        if (n == -1) {
333            return -1;
334        }
335        Utils.flipToMark(buf, mark);
336        // String s = "Receive (" + n + " bytes) ";
337        //debugPrint(s, buf);
338        return n;
339    }
340
341    @Override
342    ConnectionPool.CacheKey cacheKey() {
343        return new ConnectionPool.CacheKey(address, null);
344    }
345
346    @Override
347    synchronized boolean connected() {
348        return connected;
349    }
350
351    // used for all output in HTTP/2
352    class WriteEvent extends AsyncEvent {
353        WriteEvent() {
354            super(0);
355        }
356
357        @Override
358        public SelectableChannel channel() {
359            return chan;
360        }
361
362        @Override
363        public int interestOps() {
364            return SelectionKey.OP_WRITE;
365        }
366
367        @Override
368        public void handle() {
369            flushRegistered();
370        }
371
372        @Override
373        public void abort() {
374            shutdown();
375        }
376    }
377
378    // used for all input in HTTP/2
379    class ReadEvent extends AsyncEvent {
380        ReadEvent() {
381            super(AsyncEvent.REPEATING); // && !BLOCKING
382        }
383
384        @Override
385        public SelectableChannel channel() {
386            return chan;
387        }
388
389        @Override
390        public int interestOps() {
391            return SelectionKey.OP_READ;
392        }
393
394        @Override
395        public void handle() {
396            asyncRead();
397        }
398
399        @Override
400        public void abort() {
401            shutdown();
402        }
403
404        @Override
405        public String toString() {
406            return super.toString() + "/" + chan;
407        }
408    }
409
410    // used in blocking channels only
411    class ReceiveResponseEvent extends AsyncEvent {
412        CompletableFuture<Void> cf;
413
414        ReceiveResponseEvent(CompletableFuture<Void> cf) {
415            super(AsyncEvent.BLOCKING);
416            this.cf = cf;
417        }
418        @Override
419        public SelectableChannel channel() {
420            return chan;
421        }
422
423        @Override
424        public void handle() {
425            cf.complete(null);
426        }
427
428        @Override
429        public int interestOps() {
430            return SelectionKey.OP_READ;
431        }
432
433        @Override
434        public void abort() {
435            close();
436        }
437
438        @Override
439        public String toString() {
440            return super.toString() + "/" + chan;
441        }
442    }
443
444    @Override
445    boolean isSecure() {
446        return false;
447    }
448
449    @Override
450    boolean isProxied() {
451        return false;
452    }
453
454    @Override
455    public void setAsyncCallbacks(Consumer<ByteBufferReference> asyncReceiver,
456                                  Consumer<Throwable> errorReceiver,
457                                  Supplier<ByteBufferReference> readBufferSupplier) {
458        this.asyncReceiver = asyncReceiver;
459        this.errorReceiver = errorReceiver;
460        this.readBufferSupplier = readBufferSupplier;
461    }
462
463    @Override
464    CompletableFuture<Void> whenReceivingResponse() {
465        CompletableFuture<Void> cf = new MinimalFuture<>();
466        try {
467            ReceiveResponseEvent evt = new ReceiveResponseEvent(cf);
468            client.registerEvent(evt);
469        } catch (IOException e) {
470            cf.completeExceptionally(e);
471        }
472        return cf;
473    }
474}
475