1/*
2 * Copyright (c) 2000, 2015, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package java.nio.channels;
27
28import java.io.FileInputStream;
29import java.io.FileOutputStream;
30import java.io.InputStream;
31import java.io.OutputStream;
32import java.io.Reader;
33import java.io.Writer;
34import java.io.IOException;
35import java.nio.ByteBuffer;
36import java.nio.charset.Charset;
37import java.nio.charset.CharsetDecoder;
38import java.nio.charset.CharsetEncoder;
39import java.nio.charset.UnsupportedCharsetException;
40import java.nio.channels.spi.AbstractInterruptibleChannel;
41import java.util.Objects;
42import java.util.concurrent.ExecutionException;
43import sun.nio.ch.ChannelInputStream;
44import sun.nio.cs.StreamDecoder;
45import sun.nio.cs.StreamEncoder;
46
47
48/**
49 * Utility methods for channels and streams.
50 *
51 * <p> This class defines static methods that support the interoperation of the
52 * stream classes of the {@link java.io} package with the channel classes
53 * of this package.  </p>
54 *
55 *
56 * @author Mark Reinhold
57 * @author Mike McCloskey
58 * @author JSR-51 Expert Group
59 * @since 1.4
60 */
61
62public final class Channels {
63
64    private Channels() { throw new Error("no instances"); }
65
66    /**
67     * Write all remaining bytes in buffer to the given channel.
68     * If the channel is selectable then it must be configured blocking.
69     */
70    private static void writeFullyImpl(WritableByteChannel ch, ByteBuffer bb)
71        throws IOException
72    {
73        while (bb.remaining() > 0) {
74            int n = ch.write(bb);
75            if (n <= 0)
76                throw new RuntimeException("no bytes written");
77        }
78    }
79
80    /**
81     * Write all remaining bytes in buffer to the given channel.
82     *
83     * @throws  IllegalBlockingModeException
84     *          If the channel is selectable and configured non-blocking.
85     */
86    private static void writeFully(WritableByteChannel ch, ByteBuffer bb)
87        throws IOException
88    {
89        if (ch instanceof SelectableChannel) {
90            SelectableChannel sc = (SelectableChannel) ch;
91            synchronized (sc.blockingLock()) {
92                if (!sc.isBlocking())
93                    throw new IllegalBlockingModeException();
94                writeFullyImpl(ch, bb);
95            }
96        } else {
97            writeFullyImpl(ch, bb);
98        }
99    }
100
101    // -- Byte streams from channels --
102
103    /**
104     * Constructs a stream that reads bytes from the given channel.
105     *
106     * <p> The {@code read} methods of the resulting stream will throw an
107     * {@link IllegalBlockingModeException} if invoked while the underlying
108     * channel is in non-blocking mode.  The stream will not be buffered, and
109     * it will not support the {@link InputStream#mark mark} or {@link
110     * InputStream#reset reset} methods.  The stream will be safe for access by
111     * multiple concurrent threads.  Closing the stream will in turn cause the
112     * channel to be closed.  </p>
113     *
114     * @param  ch
115     *         The channel from which bytes will be read
116     *
117     * @return  A new input stream
118     */
119    public static InputStream newInputStream(ReadableByteChannel ch) {
120        Objects.requireNonNull(ch, "ch");
121        return new ChannelInputStream(ch);
122    }
123
124    /**
125     * Constructs a stream that writes bytes to the given channel.
126     *
127     * <p> The {@code write} methods of the resulting stream will throw an
128     * {@link IllegalBlockingModeException} if invoked while the underlying
129     * channel is in non-blocking mode.  The stream will not be buffered.  The
130     * stream will be safe for access by multiple concurrent threads.  Closing
131     * the stream will in turn cause the channel to be closed.  </p>
132     *
133     * @param  ch
134     *         The channel to which bytes will be written
135     *
136     * @return  A new output stream
137     */
138    public static OutputStream newOutputStream(WritableByteChannel ch) {
139        Objects.requireNonNull(ch, "ch");
140
141        return new OutputStream() {
142
143            private ByteBuffer bb;
144            private byte[] bs;       // Invoker's previous array
145            private byte[] b1;
146
147            @Override
148            public synchronized void write(int b) throws IOException {
149                if (b1 == null)
150                    b1 = new byte[1];
151                b1[0] = (byte) b;
152                this.write(b1);
153            }
154
155            @Override
156            public synchronized void write(byte[] bs, int off, int len)
157                    throws IOException
158            {
159                if ((off < 0) || (off > bs.length) || (len < 0) ||
160                    ((off + len) > bs.length) || ((off + len) < 0)) {
161                    throw new IndexOutOfBoundsException();
162                } else if (len == 0) {
163                    return;
164                }
165                ByteBuffer bb = ((this.bs == bs)
166                                 ? this.bb
167                                 : ByteBuffer.wrap(bs));
168                bb.limit(Math.min(off + len, bb.capacity()));
169                bb.position(off);
170                this.bb = bb;
171                this.bs = bs;
172                Channels.writeFully(ch, bb);
173            }
174
175            @Override
176            public void close() throws IOException {
177                ch.close();
178            }
179
180        };
181    }
182
183    /**
184     * Constructs a stream that reads bytes from the given channel.
185     *
186     * <p> The stream will not be buffered, and it will not support the {@link
187     * InputStream#mark mark} or {@link InputStream#reset reset} methods.  The
188     * stream will be safe for access by multiple concurrent threads.  Closing
189     * the stream will in turn cause the channel to be closed.  </p>
190     *
191     * @param  ch
192     *         The channel from which bytes will be read
193     *
194     * @return  A new input stream
195     *
196     * @since 1.7
197     */
198    public static InputStream newInputStream(AsynchronousByteChannel ch) {
199        Objects.requireNonNull(ch, "ch");
200        return new InputStream() {
201
202            private ByteBuffer bb;
203            private byte[] bs;           // Invoker's previous array
204            private byte[] b1;
205
206            @Override
207            public synchronized int read() throws IOException {
208                if (b1 == null)
209                    b1 = new byte[1];
210                int n = this.read(b1);
211                if (n == 1)
212                    return b1[0] & 0xff;
213                return -1;
214            }
215
216            @Override
217            public synchronized int read(byte[] bs, int off, int len)
218                    throws IOException
219            {
220                if ((off < 0) || (off > bs.length) || (len < 0) ||
221                    ((off + len) > bs.length) || ((off + len) < 0)) {
222                    throw new IndexOutOfBoundsException();
223                } else if (len == 0) {
224                    return 0;
225                }
226
227                ByteBuffer bb = ((this.bs == bs)
228                                 ? this.bb
229                                 : ByteBuffer.wrap(bs));
230                bb.position(off);
231                bb.limit(Math.min(off + len, bb.capacity()));
232                this.bb = bb;
233                this.bs = bs;
234
235                boolean interrupted = false;
236                try {
237                    for (;;) {
238                        try {
239                            return ch.read(bb).get();
240                        } catch (ExecutionException ee) {
241                            throw new IOException(ee.getCause());
242                        } catch (InterruptedException ie) {
243                            interrupted = true;
244                        }
245                    }
246                } finally {
247                    if (interrupted)
248                        Thread.currentThread().interrupt();
249                }
250            }
251
252            @Override
253            public void close() throws IOException {
254                ch.close();
255            }
256        };
257    }
258
259    /**
260     * Constructs a stream that writes bytes to the given channel.
261     *
262     * <p> The stream will not be buffered. The stream will be safe for access
263     * by multiple concurrent threads.  Closing the stream will in turn cause
264     * the channel to be closed.  </p>
265     *
266     * @param  ch
267     *         The channel to which bytes will be written
268     *
269     * @return  A new output stream
270     *
271     * @since 1.7
272     */
273    public static OutputStream newOutputStream(AsynchronousByteChannel ch) {
274        Objects.requireNonNull(ch, "ch");
275        return new OutputStream() {
276
277            private ByteBuffer bb;
278            private byte[] bs;   // Invoker's previous array
279            private byte[] b1;
280
281            @Override
282            public synchronized void write(int b) throws IOException {
283                if (b1 == null)
284                    b1 = new byte[1];
285                b1[0] = (byte) b;
286                this.write(b1);
287            }
288
289            @Override
290            public synchronized void write(byte[] bs, int off, int len)
291                    throws IOException
292            {
293                if ((off < 0) || (off > bs.length) || (len < 0) ||
294                    ((off + len) > bs.length) || ((off + len) < 0)) {
295                    throw new IndexOutOfBoundsException();
296                } else if (len == 0) {
297                    return;
298                }
299                ByteBuffer bb = ((this.bs == bs)
300                                 ? this.bb
301                                 : ByteBuffer.wrap(bs));
302                bb.limit(Math.min(off + len, bb.capacity()));
303                bb.position(off);
304                this.bb = bb;
305                this.bs = bs;
306
307                boolean interrupted = false;
308                try {
309                    while (bb.remaining() > 0) {
310                        try {
311                            ch.write(bb).get();
312                        } catch (ExecutionException ee) {
313                            throw new IOException(ee.getCause());
314                        } catch (InterruptedException ie) {
315                            interrupted = true;
316                        }
317                    }
318                } finally {
319                    if (interrupted)
320                        Thread.currentThread().interrupt();
321                }
322            }
323
324            @Override
325            public void close() throws IOException {
326                ch.close();
327            }
328        };
329    }
330
331
332    // -- Channels from streams --
333
334    /**
335     * Constructs a channel that reads bytes from the given stream.
336     *
337     * <p> The resulting channel will not be buffered; it will simply redirect
338     * its I/O operations to the given stream.  Closing the channel will in
339     * turn cause the stream to be closed.  </p>
340     *
341     * @param  in
342     *         The stream from which bytes are to be read
343     *
344     * @return  A new readable byte channel
345     */
346    public static ReadableByteChannel newChannel(InputStream in) {
347        Objects.requireNonNull(in, "in");
348
349        if (in.getClass() == FileInputStream.class) {
350            return ((FileInputStream) in).getChannel();
351        }
352
353        return new ReadableByteChannelImpl(in);
354    }
355
356    private static class ReadableByteChannelImpl
357        extends AbstractInterruptibleChannel    // Not really interruptible
358        implements ReadableByteChannel
359    {
360        private final InputStream in;
361        private static final int TRANSFER_SIZE = 8192;
362        private byte[] buf = new byte[0];
363        private final Object readLock = new Object();
364
365        ReadableByteChannelImpl(InputStream in) {
366            this.in = in;
367        }
368
369        @Override
370        public int read(ByteBuffer dst) throws IOException {
371            int len = dst.remaining();
372            int totalRead = 0;
373            int bytesRead = 0;
374            synchronized (readLock) {
375                while (totalRead < len) {
376                    int bytesToRead = Math.min((len - totalRead),
377                                               TRANSFER_SIZE);
378                    if (buf.length < bytesToRead)
379                        buf = new byte[bytesToRead];
380                    if ((totalRead > 0) && !(in.available() > 0))
381                        break; // block at most once
382                    try {
383                        begin();
384                        bytesRead = in.read(buf, 0, bytesToRead);
385                    } finally {
386                        end(bytesRead > 0);
387                    }
388                    if (bytesRead < 0)
389                        break;
390                    else
391                        totalRead += bytesRead;
392                    dst.put(buf, 0, bytesRead);
393                }
394                if ((bytesRead < 0) && (totalRead == 0))
395                    return -1;
396
397                return totalRead;
398            }
399        }
400
401        @Override
402        protected void implCloseChannel() throws IOException {
403            in.close();
404        }
405    }
406
407
408    /**
409     * Constructs a channel that writes bytes to the given stream.
410     *
411     * <p> The resulting channel will not be buffered; it will simply redirect
412     * its I/O operations to the given stream.  Closing the channel will in
413     * turn cause the stream to be closed.  </p>
414     *
415     * @param  out
416     *         The stream to which bytes are to be written
417     *
418     * @return  A new writable byte channel
419     */
420    public static WritableByteChannel newChannel(OutputStream out) {
421        Objects.requireNonNull(out, "out");
422
423        if (out.getClass() == FileOutputStream.class) {
424            return ((FileOutputStream) out).getChannel();
425        }
426
427        return new WritableByteChannelImpl(out);
428    }
429
430    private static class WritableByteChannelImpl
431        extends AbstractInterruptibleChannel    // Not really interruptible
432        implements WritableByteChannel
433    {
434        private final OutputStream out;
435        private static final int TRANSFER_SIZE = 8192;
436        private byte[] buf = new byte[0];
437        private final Object writeLock = new Object();
438
439        WritableByteChannelImpl(OutputStream out) {
440            this.out = out;
441        }
442
443        @Override
444        public int write(ByteBuffer src) throws IOException {
445            int len = src.remaining();
446            int totalWritten = 0;
447            synchronized (writeLock) {
448                while (totalWritten < len) {
449                    int bytesToWrite = Math.min((len - totalWritten),
450                                                TRANSFER_SIZE);
451                    if (buf.length < bytesToWrite)
452                        buf = new byte[bytesToWrite];
453                    src.get(buf, 0, bytesToWrite);
454                    try {
455                        begin();
456                        out.write(buf, 0, bytesToWrite);
457                    } finally {
458                        end(bytesToWrite > 0);
459                    }
460                    totalWritten += bytesToWrite;
461                }
462                return totalWritten;
463            }
464        }
465
466        @Override
467        protected void implCloseChannel() throws IOException {
468            out.close();
469        }
470    }
471
472
473    // -- Character streams from channels --
474
475    /**
476     * Constructs a reader that decodes bytes from the given channel using the
477     * given decoder.
478     *
479     * <p> The resulting stream will contain an internal input buffer of at
480     * least {@code minBufferCap} bytes.  The stream's {@code read} methods
481     * will, as needed, fill the buffer by reading bytes from the underlying
482     * channel; if the channel is in non-blocking mode when bytes are to be
483     * read then an {@link IllegalBlockingModeException} will be thrown.  The
484     * resulting stream will not otherwise be buffered, and it will not support
485     * the {@link Reader#mark mark} or {@link Reader#reset reset} methods.
486     * Closing the stream will in turn cause the channel to be closed.  </p>
487     *
488     * @param  ch
489     *         The channel from which bytes will be read
490     *
491     * @param  dec
492     *         The charset decoder to be used
493     *
494     * @param  minBufferCap
495     *         The minimum capacity of the internal byte buffer,
496     *         or {@code -1} if an implementation-dependent
497     *         default capacity is to be used
498     *
499     * @return  A new reader
500     */
501    public static Reader newReader(ReadableByteChannel ch,
502                                   CharsetDecoder dec,
503                                   int minBufferCap)
504    {
505        Objects.requireNonNull(ch, "ch");
506        return StreamDecoder.forDecoder(ch, dec.reset(), minBufferCap);
507    }
508
509    /**
510     * Constructs a reader that decodes bytes from the given channel according
511     * to the named charset.
512     *
513     * <p> An invocation of this method of the form
514     *
515     * <pre> {@code
516     *     Channels.newReader(ch, csname)
517     * } </pre>
518     *
519     * behaves in exactly the same way as the expression
520     *
521     * <pre> {@code
522     *     Channels.newReader(ch, Charset.forName(csName).newDecoder(), -1)
523     * } </pre>
524     *
525     * @param  ch
526     *         The channel from which bytes will be read
527     *
528     * @param  csName
529     *         The name of the charset to be used
530     *
531     * @return  A new reader
532     *
533     * @throws  UnsupportedCharsetException
534     *          If no support for the named charset is available
535     *          in this instance of the Java virtual machine
536     */
537    public static Reader newReader(ReadableByteChannel ch,
538                                   String csName)
539    {
540        Objects.requireNonNull(csName, "csName");
541        return newReader(ch, Charset.forName(csName).newDecoder(), -1);
542    }
543
544    /**
545     * Constructs a writer that encodes characters using the given encoder and
546     * writes the resulting bytes to the given channel.
547     *
548     * <p> The resulting stream will contain an internal output buffer of at
549     * least {@code minBufferCap} bytes.  The stream's {@code write} methods
550     * will, as needed, flush the buffer by writing bytes to the underlying
551     * channel; if the channel is in non-blocking mode when bytes are to be
552     * written then an {@link IllegalBlockingModeException} will be thrown.
553     * The resulting stream will not otherwise be buffered.  Closing the stream
554     * will in turn cause the channel to be closed.  </p>
555     *
556     * @param  ch
557     *         The channel to which bytes will be written
558     *
559     * @param  enc
560     *         The charset encoder to be used
561     *
562     * @param  minBufferCap
563     *         The minimum capacity of the internal byte buffer,
564     *         or {@code -1} if an implementation-dependent
565     *         default capacity is to be used
566     *
567     * @return  A new writer
568     */
569    public static Writer newWriter(WritableByteChannel ch,
570                                   CharsetEncoder enc,
571                                   int minBufferCap)
572    {
573        Objects.requireNonNull(ch, "ch");
574        return StreamEncoder.forEncoder(ch, enc.reset(), minBufferCap);
575    }
576
577    /**
578     * Constructs a writer that encodes characters according to the named
579     * charset and writes the resulting bytes to the given channel.
580     *
581     * <p> An invocation of this method of the form
582     *
583     * <pre> {@code
584     *     Channels.newWriter(ch, csname)
585     * } </pre>
586     *
587     * behaves in exactly the same way as the expression
588     *
589     * <pre> {@code
590     *     Channels.newWriter(ch, Charset.forName(csName).newEncoder(), -1)
591     * } </pre>
592     *
593     * @param  ch
594     *         The channel to which bytes will be written
595     *
596     * @param  csName
597     *         The name of the charset to be used
598     *
599     * @return  A new writer
600     *
601     * @throws  UnsupportedCharsetException
602     *          If no support for the named charset is available
603     *          in this instance of the Java virtual machine
604     */
605    public static Writer newWriter(WritableByteChannel ch,
606                                   String csName)
607    {
608        Objects.requireNonNull(csName, "csName");
609        return newWriter(ch, Charset.forName(csName).newEncoder(), -1);
610    }
611}
612