1/*
2 * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24import java.io.IOException;
25import java.io.InputStream;
26import java.io.InputStreamReader;
27import java.io.Reader;
28import java.net.URI;
29import jdk.incubator.http.HttpClient;
30import jdk.incubator.http.HttpHeaders;
31import jdk.incubator.http.HttpRequest;
32import jdk.incubator.http.HttpResponse;
33import java.nio.ByteBuffer;
34import java.nio.charset.Charset;
35import java.util.Locale;
36import java.util.Optional;
37import java.util.concurrent.ArrayBlockingQueue;
38import java.util.concurrent.BlockingQueue;
39import java.util.concurrent.CompletableFuture;
40import java.util.concurrent.CompletionStage;
41import java.util.concurrent.Flow;
42import java.util.stream.Stream;
43
44/*
45 * @test
46 * @summary An example on how to read a response body with InputStream...
47 * @run main/othervm HttpInputStreamTest
48 * @author daniel fuchs
49 */
50public class HttpInputStreamTest {
51
52    public static boolean DEBUG = Boolean.getBoolean("test.debug");
53
54    /**
55     * A simple HttpResponse.BodyHandler that creates a live
56     * InputStream to read the response body from the underlying ByteBuffer
57     * Flow.
58     * The InputStream is made immediately available for consumption, before
59     * the response body is fully received.
60     */
61    public static class HttpInputStreamHandler
62        implements HttpResponse.BodyHandler<InputStream>    {
63
64        public static final int MAX_BUFFERS_IN_QUEUE = 1;
65
66        private final int maxBuffers;
67
68        public HttpInputStreamHandler() {
69            this(MAX_BUFFERS_IN_QUEUE);
70        }
71
72        public HttpInputStreamHandler(int maxBuffers) {
73            this.maxBuffers = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers;
74        }
75
76        @Override
77        public synchronized HttpResponse.BodyProcessor<InputStream>
78                apply(int i, HttpHeaders hh) {
79            return new HttpResponseInputStream(maxBuffers);
80        }
81
82        /**
83         * An InputStream built on top of the Flow API.
84         */
85        private static class HttpResponseInputStream extends InputStream
86                    implements HttpResponse.BodyProcessor<InputStream> {
87
88            // An immutable ByteBuffer sentinel to mark that the last byte was received.
89            private static final ByteBuffer LAST = ByteBuffer.wrap(new byte[0]);
90
91            // A queue of yet unprocessed ByteBuffers received from the flow API.
92            private final BlockingQueue<ByteBuffer> buffers;
93            private volatile Flow.Subscription subscription;
94            private volatile boolean closed;
95            private volatile Throwable failed;
96            private volatile ByteBuffer current;
97
98            HttpResponseInputStream() {
99                this(MAX_BUFFERS_IN_QUEUE);
100            }
101
102            HttpResponseInputStream(int maxBuffers) {
103                int capacity = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers;
104                this.buffers = new ArrayBlockingQueue<>(capacity);
105            }
106
107            @Override
108            public CompletionStage<InputStream> getBody() {
109                // Return the stream immediately, before the
110                // response body is received.
111                // This makes it possible for senAsync().get().body()
112                // to complete before the response body is received.
113                return CompletableFuture.completedStage(this);
114            }
115
116            // Returns the current byte buffer to read from.
117            // If the current buffer has no remaining data, will take the
118            // next buffer from the buffers queue, possibly blocking until
119            // a new buffer is made available through the Flow API, or the
120            // end of the flow is reached.
121            private ByteBuffer current() throws IOException {
122                while (current == null || !current.hasRemaining()) {
123                    // Check whether the stream is claused or exhausted
124                    if (closed || failed != null) {
125                        throw new IOException("closed", failed);
126                    }
127                    if (current == LAST) break;
128
129                    try {
130                        // Take a new buffer from the queue, blocking
131                        // if none is available yet...
132                        if (DEBUG) System.err.println("Taking Buffer");
133                        current = buffers.take();
134                        if (DEBUG) System.err.println("Buffer Taken");
135
136                        // Check whether some exception was encountered
137                        // upstream
138                        if (closed || failed != null) {
139                            throw new IOException("closed", failed);
140                        }
141
142                        // Check whether we're done.
143                        if (current == LAST) break;
144
145                        // Inform the producer that it can start sending
146                        // us a new buffer
147                        Flow.Subscription s = subscription;
148                        if (s != null) s.request(1);
149
150                    } catch (InterruptedException ex) {
151                        // continue
152                    }
153                }
154                assert current == LAST || current.hasRemaining();
155                return current;
156            }
157
158            @Override
159            public int read(byte[] bytes, int off, int len) throws IOException {
160                // get the buffer to read from, possibly blocking if
161                // none is available
162                ByteBuffer buffer;
163                if ((buffer = current()) == LAST) return -1;
164
165                // don't attempt to read more than what is available
166                // in the current buffer.
167                int read = Math.min(buffer.remaining(), len);
168                assert read > 0 && read <= buffer.remaining();
169
170                // buffer.get() will do the boundary check for us.
171                buffer.get(bytes, off, read);
172                return read;
173            }
174
175            @Override
176            public int read() throws IOException {
177                ByteBuffer buffer;
178                if ((buffer = current()) == LAST) return -1;
179                return buffer.get() & 0xFF;
180            }
181
182            @Override
183            public void onSubscribe(Flow.Subscription s) {
184                this.subscription = s;
185                s.request(Math.max(2, buffers.remainingCapacity() + 1));
186            }
187
188            @Override
189            public synchronized void onNext(ByteBuffer t) {
190                try {
191                    if (DEBUG) System.err.println("next buffer received");
192                    buffers.put(t);
193                    if (DEBUG) System.err.println("buffered offered");
194                } catch (Exception ex) {
195                    failed = ex;
196                    try {
197                        close();
198                    } catch (IOException ex1) {
199                        // OK
200                    }
201                }
202            }
203
204            @Override
205            public void onError(Throwable thrwbl) {
206                failed = thrwbl;
207            }
208
209            @Override
210            public synchronized void onComplete() {
211                subscription = null;
212                onNext(LAST);
213            }
214
215            @Override
216            public void close() throws IOException {
217                synchronized (this) {
218                    closed = true;
219                    Flow.Subscription s = subscription;
220                    if (s != null) {
221                        s.cancel();
222                    }
223                    subscription = null;
224                }
225                super.close();
226            }
227
228        }
229    }
230
231    /**
232     * Examine the response headers to figure out the charset used to
233     * encode the body content.
234     * If the content type is not textual, returns an empty Optional.
235     * Otherwise, returns the body content's charset, defaulting to
236     * ISO-8859-1 if none is explicitly specified.
237     * @param headers The response headers.
238     * @return The charset to use for decoding the response body, if
239     *         the response body content is text/...
240     */
241    public static Optional<Charset> getCharset(HttpHeaders headers) {
242        Optional<String> contentType = headers.firstValue("Content-Type");
243        Optional<Charset> charset = Optional.empty();
244        if (contentType.isPresent()) {
245            final String[] values = contentType.get().split(";");
246            if (values[0].startsWith("text/")) {
247                charset = Optional.of(Stream.of(values)
248                    .map(x -> x.toLowerCase(Locale.ROOT))
249                    .map(String::trim)
250                    .filter(x -> x.startsWith("charset="))
251                    .map(x -> x.substring("charset=".length()))
252                    .findFirst()
253                    .orElse("ISO-8859-1"))
254                    .map(Charset::forName);
255            }
256        }
257        return charset;
258    }
259
260    public static void main(String[] args) throws Exception {
261        HttpClient client = HttpClient.newHttpClient();
262        HttpRequest request = HttpRequest
263            .newBuilder(new URI("http://hg.openjdk.java.net/jdk9/sandbox/jdk/shortlog/http-client-branch/"))
264            .GET()
265            .build();
266
267        // This example shows how to return an InputStream that can be used to
268        // start reading the response body before the response is fully received.
269        // In comparison, the snipet below (which uses
270        // HttpResponse.BodyHandler.asString()) obviously will not return before the
271        // response body is fully read:
272        //
273        // System.out.println(
274        //    client.sendAsync(request, HttpResponse.BodyHandler.asString()).get().body());
275
276        CompletableFuture<HttpResponse<InputStream>> handle =
277            client.sendAsync(request, new HttpInputStreamHandler());
278        if (DEBUG) System.err.println("Request sent");
279
280        HttpResponse<InputStream> pending = handle.get();
281
282        // At this point, the response headers have been received, but the
283        // response body may not have arrived yet. This comes from
284        // the implementation of HttpResponseInputStream::getBody above,
285        // which returns an already completed completion stage, without
286        // waiting for any data.
287        // We can therefore access the headers - and the body, which
288        // is our live InputStream, without waiting...
289        HttpHeaders responseHeaders = pending.headers();
290
291        // Get the charset declared in the response headers.
292        // The optional will be empty if the content type is not
293        // of type text/...
294        Optional<Charset> charset = getCharset(responseHeaders);
295
296        try (InputStream is = pending.body();
297            // We assume a textual content type. Construct an InputStream
298            // Reader with the appropriate Charset.
299            // charset.get() will throw NPE if the content is not textual.
300            Reader r = new InputStreamReader(is, charset.get())) {
301
302            char[] buff = new char[32];
303            int off=0, n=0;
304            if (DEBUG) System.err.println("Start receiving response body");
305            if (DEBUG) System.err.println("Charset: " + charset.get());
306
307            // Start consuming the InputStream as the data arrives.
308            // Will block until there is something to read...
309            while ((n = r.read(buff, off, buff.length - off)) > 0) {
310                assert (buff.length - off) > 0;
311                assert n <= (buff.length - off);
312                if (n == (buff.length - off)) {
313                    System.out.print(buff);
314                    off = 0;
315                } else {
316                    off += n;
317                }
318                assert off < buff.length;
319            }
320
321            // last call to read may not have filled 'buff' completely.
322            // flush out the remaining characters.
323            assert off >= 0 && off < buff.length;
324            for (int i=0; i < off; i++) {
325                System.out.print(buff[i]);
326            }
327
328            // We're done!
329            System.out.println("Done!");
330        }
331    }
332
333}
334