1/* 2 * Copyright (c) 2000, 2015, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26package java.nio.channels; 27 28import java.io.FileInputStream; 29import java.io.FileOutputStream; 30import java.io.InputStream; 31import java.io.OutputStream; 32import java.io.Reader; 33import java.io.Writer; 34import java.io.IOException; 35import java.nio.ByteBuffer; 36import java.nio.charset.Charset; 37import java.nio.charset.CharsetDecoder; 38import java.nio.charset.CharsetEncoder; 39import java.nio.charset.UnsupportedCharsetException; 40import java.nio.channels.spi.AbstractInterruptibleChannel; 41import java.util.Objects; 42import java.util.concurrent.ExecutionException; 43import sun.nio.ch.ChannelInputStream; 44import sun.nio.cs.StreamDecoder; 45import sun.nio.cs.StreamEncoder; 46 47 48/** 49 * Utility methods for channels and streams. 50 * 51 * <p> This class defines static methods that support the interoperation of the 52 * stream classes of the {@link java.io} package with the channel classes 53 * of this package. </p> 54 * 55 * 56 * @author Mark Reinhold 57 * @author Mike McCloskey 58 * @author JSR-51 Expert Group 59 * @since 1.4 60 */ 61 62public final class Channels { 63 64 private Channels() { throw new Error("no instances"); } 65 66 /** 67 * Write all remaining bytes in buffer to the given channel. 68 * If the channel is selectable then it must be configured blocking. 69 */ 70 private static void writeFullyImpl(WritableByteChannel ch, ByteBuffer bb) 71 throws IOException 72 { 73 while (bb.remaining() > 0) { 74 int n = ch.write(bb); 75 if (n <= 0) 76 throw new RuntimeException("no bytes written"); 77 } 78 } 79 80 /** 81 * Write all remaining bytes in buffer to the given channel. 82 * 83 * @throws IllegalBlockingModeException 84 * If the channel is selectable and configured non-blocking. 85 */ 86 private static void writeFully(WritableByteChannel ch, ByteBuffer bb) 87 throws IOException 88 { 89 if (ch instanceof SelectableChannel) { 90 SelectableChannel sc = (SelectableChannel) ch; 91 synchronized (sc.blockingLock()) { 92 if (!sc.isBlocking()) 93 throw new IllegalBlockingModeException(); 94 writeFullyImpl(ch, bb); 95 } 96 } else { 97 writeFullyImpl(ch, bb); 98 } 99 } 100 101 // -- Byte streams from channels -- 102 103 /** 104 * Constructs a stream that reads bytes from the given channel. 105 * 106 * <p> The {@code read} methods of the resulting stream will throw an 107 * {@link IllegalBlockingModeException} if invoked while the underlying 108 * channel is in non-blocking mode. The stream will not be buffered, and 109 * it will not support the {@link InputStream#mark mark} or {@link 110 * InputStream#reset reset} methods. The stream will be safe for access by 111 * multiple concurrent threads. Closing the stream will in turn cause the 112 * channel to be closed. </p> 113 * 114 * @param ch 115 * The channel from which bytes will be read 116 * 117 * @return A new input stream 118 */ 119 public static InputStream newInputStream(ReadableByteChannel ch) { 120 Objects.requireNonNull(ch, "ch"); 121 return new ChannelInputStream(ch); 122 } 123 124 /** 125 * Constructs a stream that writes bytes to the given channel. 126 * 127 * <p> The {@code write} methods of the resulting stream will throw an 128 * {@link IllegalBlockingModeException} if invoked while the underlying 129 * channel is in non-blocking mode. The stream will not be buffered. The 130 * stream will be safe for access by multiple concurrent threads. Closing 131 * the stream will in turn cause the channel to be closed. </p> 132 * 133 * @param ch 134 * The channel to which bytes will be written 135 * 136 * @return A new output stream 137 */ 138 public static OutputStream newOutputStream(WritableByteChannel ch) { 139 Objects.requireNonNull(ch, "ch"); 140 141 return new OutputStream() { 142 143 private ByteBuffer bb; 144 private byte[] bs; // Invoker's previous array 145 private byte[] b1; 146 147 @Override 148 public synchronized void write(int b) throws IOException { 149 if (b1 == null) 150 b1 = new byte[1]; 151 b1[0] = (byte) b; 152 this.write(b1); 153 } 154 155 @Override 156 public synchronized void write(byte[] bs, int off, int len) 157 throws IOException 158 { 159 if ((off < 0) || (off > bs.length) || (len < 0) || 160 ((off + len) > bs.length) || ((off + len) < 0)) { 161 throw new IndexOutOfBoundsException(); 162 } else if (len == 0) { 163 return; 164 } 165 ByteBuffer bb = ((this.bs == bs) 166 ? this.bb 167 : ByteBuffer.wrap(bs)); 168 bb.limit(Math.min(off + len, bb.capacity())); 169 bb.position(off); 170 this.bb = bb; 171 this.bs = bs; 172 Channels.writeFully(ch, bb); 173 } 174 175 @Override 176 public void close() throws IOException { 177 ch.close(); 178 } 179 180 }; 181 } 182 183 /** 184 * Constructs a stream that reads bytes from the given channel. 185 * 186 * <p> The stream will not be buffered, and it will not support the {@link 187 * InputStream#mark mark} or {@link InputStream#reset reset} methods. The 188 * stream will be safe for access by multiple concurrent threads. Closing 189 * the stream will in turn cause the channel to be closed. </p> 190 * 191 * @param ch 192 * The channel from which bytes will be read 193 * 194 * @return A new input stream 195 * 196 * @since 1.7 197 */ 198 public static InputStream newInputStream(AsynchronousByteChannel ch) { 199 Objects.requireNonNull(ch, "ch"); 200 return new InputStream() { 201 202 private ByteBuffer bb; 203 private byte[] bs; // Invoker's previous array 204 private byte[] b1; 205 206 @Override 207 public synchronized int read() throws IOException { 208 if (b1 == null) 209 b1 = new byte[1]; 210 int n = this.read(b1); 211 if (n == 1) 212 return b1[0] & 0xff; 213 return -1; 214 } 215 216 @Override 217 public synchronized int read(byte[] bs, int off, int len) 218 throws IOException 219 { 220 if ((off < 0) || (off > bs.length) || (len < 0) || 221 ((off + len) > bs.length) || ((off + len) < 0)) { 222 throw new IndexOutOfBoundsException(); 223 } else if (len == 0) { 224 return 0; 225 } 226 227 ByteBuffer bb = ((this.bs == bs) 228 ? this.bb 229 : ByteBuffer.wrap(bs)); 230 bb.position(off); 231 bb.limit(Math.min(off + len, bb.capacity())); 232 this.bb = bb; 233 this.bs = bs; 234 235 boolean interrupted = false; 236 try { 237 for (;;) { 238 try { 239 return ch.read(bb).get(); 240 } catch (ExecutionException ee) { 241 throw new IOException(ee.getCause()); 242 } catch (InterruptedException ie) { 243 interrupted = true; 244 } 245 } 246 } finally { 247 if (interrupted) 248 Thread.currentThread().interrupt(); 249 } 250 } 251 252 @Override 253 public void close() throws IOException { 254 ch.close(); 255 } 256 }; 257 } 258 259 /** 260 * Constructs a stream that writes bytes to the given channel. 261 * 262 * <p> The stream will not be buffered. The stream will be safe for access 263 * by multiple concurrent threads. Closing the stream will in turn cause 264 * the channel to be closed. </p> 265 * 266 * @param ch 267 * The channel to which bytes will be written 268 * 269 * @return A new output stream 270 * 271 * @since 1.7 272 */ 273 public static OutputStream newOutputStream(AsynchronousByteChannel ch) { 274 Objects.requireNonNull(ch, "ch"); 275 return new OutputStream() { 276 277 private ByteBuffer bb; 278 private byte[] bs; // Invoker's previous array 279 private byte[] b1; 280 281 @Override 282 public synchronized void write(int b) throws IOException { 283 if (b1 == null) 284 b1 = new byte[1]; 285 b1[0] = (byte) b; 286 this.write(b1); 287 } 288 289 @Override 290 public synchronized void write(byte[] bs, int off, int len) 291 throws IOException 292 { 293 if ((off < 0) || (off > bs.length) || (len < 0) || 294 ((off + len) > bs.length) || ((off + len) < 0)) { 295 throw new IndexOutOfBoundsException(); 296 } else if (len == 0) { 297 return; 298 } 299 ByteBuffer bb = ((this.bs == bs) 300 ? this.bb 301 : ByteBuffer.wrap(bs)); 302 bb.limit(Math.min(off + len, bb.capacity())); 303 bb.position(off); 304 this.bb = bb; 305 this.bs = bs; 306 307 boolean interrupted = false; 308 try { 309 while (bb.remaining() > 0) { 310 try { 311 ch.write(bb).get(); 312 } catch (ExecutionException ee) { 313 throw new IOException(ee.getCause()); 314 } catch (InterruptedException ie) { 315 interrupted = true; 316 } 317 } 318 } finally { 319 if (interrupted) 320 Thread.currentThread().interrupt(); 321 } 322 } 323 324 @Override 325 public void close() throws IOException { 326 ch.close(); 327 } 328 }; 329 } 330 331 332 // -- Channels from streams -- 333 334 /** 335 * Constructs a channel that reads bytes from the given stream. 336 * 337 * <p> The resulting channel will not be buffered; it will simply redirect 338 * its I/O operations to the given stream. Closing the channel will in 339 * turn cause the stream to be closed. </p> 340 * 341 * @param in 342 * The stream from which bytes are to be read 343 * 344 * @return A new readable byte channel 345 */ 346 public static ReadableByteChannel newChannel(InputStream in) { 347 Objects.requireNonNull(in, "in"); 348 349 if (in.getClass() == FileInputStream.class) { 350 return ((FileInputStream) in).getChannel(); 351 } 352 353 return new ReadableByteChannelImpl(in); 354 } 355 356 private static class ReadableByteChannelImpl 357 extends AbstractInterruptibleChannel // Not really interruptible 358 implements ReadableByteChannel 359 { 360 private final InputStream in; 361 private static final int TRANSFER_SIZE = 8192; 362 private byte[] buf = new byte[0]; 363 private final Object readLock = new Object(); 364 365 ReadableByteChannelImpl(InputStream in) { 366 this.in = in; 367 } 368 369 @Override 370 public int read(ByteBuffer dst) throws IOException { 371 int len = dst.remaining(); 372 int totalRead = 0; 373 int bytesRead = 0; 374 synchronized (readLock) { 375 while (totalRead < len) { 376 int bytesToRead = Math.min((len - totalRead), 377 TRANSFER_SIZE); 378 if (buf.length < bytesToRead) 379 buf = new byte[bytesToRead]; 380 if ((totalRead > 0) && !(in.available() > 0)) 381 break; // block at most once 382 try { 383 begin(); 384 bytesRead = in.read(buf, 0, bytesToRead); 385 } finally { 386 end(bytesRead > 0); 387 } 388 if (bytesRead < 0) 389 break; 390 else 391 totalRead += bytesRead; 392 dst.put(buf, 0, bytesRead); 393 } 394 if ((bytesRead < 0) && (totalRead == 0)) 395 return -1; 396 397 return totalRead; 398 } 399 } 400 401 @Override 402 protected void implCloseChannel() throws IOException { 403 in.close(); 404 } 405 } 406 407 408 /** 409 * Constructs a channel that writes bytes to the given stream. 410 * 411 * <p> The resulting channel will not be buffered; it will simply redirect 412 * its I/O operations to the given stream. Closing the channel will in 413 * turn cause the stream to be closed. </p> 414 * 415 * @param out 416 * The stream to which bytes are to be written 417 * 418 * @return A new writable byte channel 419 */ 420 public static WritableByteChannel newChannel(OutputStream out) { 421 Objects.requireNonNull(out, "out"); 422 423 if (out.getClass() == FileOutputStream.class) { 424 return ((FileOutputStream) out).getChannel(); 425 } 426 427 return new WritableByteChannelImpl(out); 428 } 429 430 private static class WritableByteChannelImpl 431 extends AbstractInterruptibleChannel // Not really interruptible 432 implements WritableByteChannel 433 { 434 private final OutputStream out; 435 private static final int TRANSFER_SIZE = 8192; 436 private byte[] buf = new byte[0]; 437 private final Object writeLock = new Object(); 438 439 WritableByteChannelImpl(OutputStream out) { 440 this.out = out; 441 } 442 443 @Override 444 public int write(ByteBuffer src) throws IOException { 445 int len = src.remaining(); 446 int totalWritten = 0; 447 synchronized (writeLock) { 448 while (totalWritten < len) { 449 int bytesToWrite = Math.min((len - totalWritten), 450 TRANSFER_SIZE); 451 if (buf.length < bytesToWrite) 452 buf = new byte[bytesToWrite]; 453 src.get(buf, 0, bytesToWrite); 454 try { 455 begin(); 456 out.write(buf, 0, bytesToWrite); 457 } finally { 458 end(bytesToWrite > 0); 459 } 460 totalWritten += bytesToWrite; 461 } 462 return totalWritten; 463 } 464 } 465 466 @Override 467 protected void implCloseChannel() throws IOException { 468 out.close(); 469 } 470 } 471 472 473 // -- Character streams from channels -- 474 475 /** 476 * Constructs a reader that decodes bytes from the given channel using the 477 * given decoder. 478 * 479 * <p> The resulting stream will contain an internal input buffer of at 480 * least {@code minBufferCap} bytes. The stream's {@code read} methods 481 * will, as needed, fill the buffer by reading bytes from the underlying 482 * channel; if the channel is in non-blocking mode when bytes are to be 483 * read then an {@link IllegalBlockingModeException} will be thrown. The 484 * resulting stream will not otherwise be buffered, and it will not support 485 * the {@link Reader#mark mark} or {@link Reader#reset reset} methods. 486 * Closing the stream will in turn cause the channel to be closed. </p> 487 * 488 * @param ch 489 * The channel from which bytes will be read 490 * 491 * @param dec 492 * The charset decoder to be used 493 * 494 * @param minBufferCap 495 * The minimum capacity of the internal byte buffer, 496 * or {@code -1} if an implementation-dependent 497 * default capacity is to be used 498 * 499 * @return A new reader 500 */ 501 public static Reader newReader(ReadableByteChannel ch, 502 CharsetDecoder dec, 503 int minBufferCap) 504 { 505 Objects.requireNonNull(ch, "ch"); 506 return StreamDecoder.forDecoder(ch, dec.reset(), minBufferCap); 507 } 508 509 /** 510 * Constructs a reader that decodes bytes from the given channel according 511 * to the named charset. 512 * 513 * <p> An invocation of this method of the form 514 * 515 * <pre> {@code 516 * Channels.newReader(ch, csname) 517 * } </pre> 518 * 519 * behaves in exactly the same way as the expression 520 * 521 * <pre> {@code 522 * Channels.newReader(ch, Charset.forName(csName).newDecoder(), -1) 523 * } </pre> 524 * 525 * @param ch 526 * The channel from which bytes will be read 527 * 528 * @param csName 529 * The name of the charset to be used 530 * 531 * @return A new reader 532 * 533 * @throws UnsupportedCharsetException 534 * If no support for the named charset is available 535 * in this instance of the Java virtual machine 536 */ 537 public static Reader newReader(ReadableByteChannel ch, 538 String csName) 539 { 540 Objects.requireNonNull(csName, "csName"); 541 return newReader(ch, Charset.forName(csName).newDecoder(), -1); 542 } 543 544 /** 545 * Constructs a writer that encodes characters using the given encoder and 546 * writes the resulting bytes to the given channel. 547 * 548 * <p> The resulting stream will contain an internal output buffer of at 549 * least {@code minBufferCap} bytes. The stream's {@code write} methods 550 * will, as needed, flush the buffer by writing bytes to the underlying 551 * channel; if the channel is in non-blocking mode when bytes are to be 552 * written then an {@link IllegalBlockingModeException} will be thrown. 553 * The resulting stream will not otherwise be buffered. Closing the stream 554 * will in turn cause the channel to be closed. </p> 555 * 556 * @param ch 557 * The channel to which bytes will be written 558 * 559 * @param enc 560 * The charset encoder to be used 561 * 562 * @param minBufferCap 563 * The minimum capacity of the internal byte buffer, 564 * or {@code -1} if an implementation-dependent 565 * default capacity is to be used 566 * 567 * @return A new writer 568 */ 569 public static Writer newWriter(WritableByteChannel ch, 570 CharsetEncoder enc, 571 int minBufferCap) 572 { 573 Objects.requireNonNull(ch, "ch"); 574 return StreamEncoder.forEncoder(ch, enc.reset(), minBufferCap); 575 } 576 577 /** 578 * Constructs a writer that encodes characters according to the named 579 * charset and writes the resulting bytes to the given channel. 580 * 581 * <p> An invocation of this method of the form 582 * 583 * <pre> {@code 584 * Channels.newWriter(ch, csname) 585 * } </pre> 586 * 587 * behaves in exactly the same way as the expression 588 * 589 * <pre> {@code 590 * Channels.newWriter(ch, Charset.forName(csName).newEncoder(), -1) 591 * } </pre> 592 * 593 * @param ch 594 * The channel to which bytes will be written 595 * 596 * @param csName 597 * The name of the charset to be used 598 * 599 * @return A new writer 600 * 601 * @throws UnsupportedCharsetException 602 * If no support for the named charset is available 603 * in this instance of the Java virtual machine 604 */ 605 public static Writer newWriter(WritableByteChannel ch, 606 String csName) 607 { 608 Objects.requireNonNull(csName, "csName"); 609 return newWriter(ch, Charset.forName(csName).newEncoder(), -1); 610 } 611} 612