Basic.java revision 11822:110f7f35760f
1189251Ssam/* 2189251Ssam * Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved. 3214734Srpaulo * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4189251Ssam * 5252726Srpaulo * This code is free software; you can redistribute it and/or modify it 6252726Srpaulo * under the terms of the GNU General Public License version 2 only, as 7189251Ssam * published by the Free Software Foundation. 8189251Ssam * 9189251Ssam * This code is distributed in the hope that it will be useful, but WITHOUT 10189251Ssam * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11189251Ssam * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12189251Ssam * version 2 for more details (a copy is included in the LICENSE file that 13189251Ssam * accompanied this code). 14189251Ssam * 15189251Ssam * You should have received a copy of the GNU General Public License version 16189251Ssam * 2 along with this work; if not, write to the Free Software Foundation, 17189251Ssam * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18189251Ssam * 19189251Ssam * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20189251Ssam * or visit www.oracle.com if you need additional information or have any 21189251Ssam * questions. 22189251Ssam */ 23189251Ssam 24189251Ssam/* @test 25189251Ssam * @bug 4607272 6842687 6878369 6944810 7023403 26189251Ssam * @summary Unit test for AsynchronousSocketChannel 27189251Ssam * @run main Basic -skipSlowConnectTest 28189251Ssam * @key randomness 29189251Ssam */ 30214734Srpaulo 31189251Ssamimport java.nio.ByteBuffer; 32214734Srpauloimport java.nio.channels.*; 33189251Ssamimport static java.net.StandardSocketOptions.*; 34189251Ssamimport java.net.*; 35189251Ssamimport java.util.Random; 36189251Ssamimport java.util.concurrent.*; 37189251Ssamimport java.util.concurrent.atomic.*; 38189251Ssamimport java.io.Closeable; 39189251Ssamimport java.io.IOException; 40214734Srpaulo 41189251Ssampublic class Basic { 42214734Srpaulo static final Random rand = new Random(); 43189251Ssam 44214734Srpaulo static boolean skipSlowConnectTest = false; 45214734Srpaulo 46189251Ssam public static void main(String[] args) throws Exception { 47189251Ssam for (String arg: args) { 48189251Ssam switch (arg) { 49189251Ssam case "-skipSlowConnectTest" : 50189251Ssam skipSlowConnectTest = true; 51214734Srpaulo break; 52189251Ssam default: 53214734Srpaulo throw new RuntimeException("Unrecognized argument: " + arg); 54214734Srpaulo } 55189251Ssam } 56189251Ssam 57189251Ssam testBind(); 58189251Ssam testSocketOptions(); 59189251Ssam testConnect(); 60189251Ssam testCloseWhenPending(); 61189251Ssam testCancel(); 62189251Ssam testRead1(); 63189251Ssam testRead2(); 64189251Ssam testRead3(); 65189251Ssam testWrite1(); 66189251Ssam testWrite2(); 67189251Ssam // skip timeout tests until 7052549 is fixed 68189251Ssam if (!System.getProperty("os.name").startsWith("Windows")) 69189251Ssam testTimeout(); 70189251Ssam testShutdown(); 71189251Ssam } 72189251Ssam 73189251Ssam static class Server implements Closeable { 74189251Ssam private final ServerSocketChannel ssc; 75189251Ssam private final InetSocketAddress address; 76189251Ssam 77214734Srpaulo Server() throws IOException { 78189251Ssam ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0)); 79214734Srpaulo 80214734Srpaulo InetAddress lh = InetAddress.getLocalHost(); 81189251Ssam int port = ((InetSocketAddress)(ssc.getLocalAddress())).getPort(); 82189251Ssam address = new InetSocketAddress(lh, port); 83189251Ssam } 84189251Ssam 85189251Ssam InetSocketAddress address() { 86189251Ssam return address; 87189251Ssam } 88189251Ssam 89189251Ssam SocketChannel accept() throws IOException { 90189251Ssam return ssc.accept(); 91189251Ssam } 92189251Ssam 93189251Ssam public void close() throws IOException { 94189251Ssam ssc.close(); 95189251Ssam } 96189251Ssam 97189251Ssam } 98189251Ssam 99189251Ssam static void testBind() throws Exception { 100189251Ssam System.out.println("-- bind --"); 101189251Ssam 102189251Ssam try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 103189251Ssam if (ch.getLocalAddress() != null) 104189251Ssam throw new RuntimeException("Local address should be 'null'"); 105189251Ssam ch.bind(new InetSocketAddress(0)); 106189251Ssam 107189251Ssam // check local address after binding 108189251Ssam InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress(); 109189251Ssam if (local.getPort() == 0) 110189251Ssam throw new RuntimeException("Unexpected port"); 111189251Ssam if (!local.getAddress().isAnyLocalAddress()) 112189251Ssam throw new RuntimeException("Not bound to a wildcard address"); 113189251Ssam 114189251Ssam // try to re-bind 115189251Ssam try { 116189251Ssam ch.bind(new InetSocketAddress(0)); 117189251Ssam throw new RuntimeException("AlreadyBoundException expected"); 118189251Ssam } catch (AlreadyBoundException x) { 119189251Ssam } 120189251Ssam } 121189251Ssam 122189251Ssam // check ClosedChannelException 123189251Ssam AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 124189251Ssam ch.close(); 125189251Ssam try { 126189251Ssam ch.bind(new InetSocketAddress(0)); 127189251Ssam throw new RuntimeException("ClosedChannelException expected"); 128189251Ssam } catch (ClosedChannelException x) { 129189251Ssam } 130189251Ssam } 131189251Ssam 132189251Ssam static void testSocketOptions() throws Exception { 133189251Ssam System.out.println("-- socket options --"); 134189251Ssam 135189251Ssam try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 136189251Ssam ch.setOption(SO_RCVBUF, 128*1024) 137252726Srpaulo .setOption(SO_SNDBUF, 128*1024) 138252726Srpaulo .setOption(SO_REUSEADDR, true); 139189251Ssam 140189251Ssam // check SO_SNDBUF/SO_RCVBUF limits 141189251Ssam int before, after; 142189251Ssam before = ch.getOption(SO_SNDBUF); 143189251Ssam after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF); 144189251Ssam if (after < before) 145189251Ssam throw new RuntimeException("setOption caused SO_SNDBUF to decrease"); 146189251Ssam before = ch.getOption(SO_RCVBUF); 147189251Ssam after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF); 148189251Ssam if (after < before) 149189251Ssam throw new RuntimeException("setOption caused SO_RCVBUF to decrease"); 150189251Ssam 151189251Ssam ch.bind(new InetSocketAddress(0)); 152189251Ssam 153189251Ssam // default values 154189251Ssam if (ch.getOption(SO_KEEPALIVE)) 155189251Ssam throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'"); 156189251Ssam if (ch.getOption(TCP_NODELAY)) 157189251Ssam throw new RuntimeException("Default of TCP_NODELAY should be 'false'"); 158189251Ssam 159189251Ssam // set and check 160189251Ssam if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE)) 161189251Ssam throw new RuntimeException("SO_KEEPALIVE did not change"); 162189251Ssam if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY)) 163189251Ssam throw new RuntimeException("SO_KEEPALIVE did not change"); 164189251Ssam 165189251Ssam // read others (can't check as actual value is implementation dependent) 166189251Ssam ch.getOption(SO_RCVBUF); 167189251Ssam ch.getOption(SO_SNDBUF); 168189251Ssam } 169189251Ssam } 170189251Ssam 171189251Ssam static void testConnect() throws Exception { 172189251Ssam System.out.println("-- connect --"); 173189251Ssam 174189251Ssam SocketAddress address; 175189251Ssam 176189251Ssam try (Server server = new Server()) { 177189251Ssam address = server.address(); 178189251Ssam 179189251Ssam // connect to server and check local/remote addresses 180189251Ssam try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 181189251Ssam ch.connect(address).get(); 182189251Ssam // check local address 183189251Ssam if (ch.getLocalAddress() == null) 184189251Ssam throw new RuntimeException("Not bound to local address"); 185189251Ssam 186189251Ssam // check remote address 187189251Ssam InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress(); 188189251Ssam if (remote.getPort() != server.address().getPort()) 189189251Ssam throw new RuntimeException("Connected to unexpected port"); 190189251Ssam if (!remote.getAddress().equals(server.address().getAddress())) 191189251Ssam throw new RuntimeException("Connected to unexpected address"); 192189251Ssam 193189251Ssam // try to connect again 194189251Ssam try { 195189251Ssam ch.connect(server.address()).get(); 196189251Ssam throw new RuntimeException("AlreadyConnectedException expected"); 197189251Ssam } catch (AlreadyConnectedException x) { 198189251Ssam } 199189251Ssam 200189251Ssam // clean-up 201189251Ssam server.accept().close(); 202189251Ssam } 203189251Ssam 204189251Ssam // check that connect fails with ClosedChannelException 205189251Ssam AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 206189251Ssam ch.close(); 207189251Ssam try { 208189251Ssam ch.connect(server.address()).get(); 209189251Ssam throw new RuntimeException("ExecutionException expected"); 210189251Ssam } catch (ExecutionException x) { 211189251Ssam if (!(x.getCause() instanceof ClosedChannelException)) 212189251Ssam throw new RuntimeException("Cause of ClosedChannelException expected"); 213189251Ssam } 214189251Ssam final AtomicReference<Throwable> connectException = new AtomicReference<>(); 215189251Ssam ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() { 216189251Ssam public void completed(Void result, Void att) { 217189251Ssam } 218189251Ssam public void failed(Throwable exc, Void att) { 219189251Ssam connectException.set(exc); 220189251Ssam } 221189251Ssam }); 222189251Ssam while (connectException.get() == null) { 223189251Ssam Thread.sleep(100); 224189251Ssam } 225189251Ssam if (!(connectException.get() instanceof ClosedChannelException)) 226189251Ssam throw new RuntimeException("ClosedChannelException expected"); 227189251Ssam } 228189251Ssam 229189251Ssam // test that failure to connect closes the channel 230189251Ssam try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 231189251Ssam try { 232189251Ssam ch.connect(address).get(); 233189251Ssam } catch (ExecutionException x) { 234189251Ssam // failed to establish connection 235189251Ssam if (ch.isOpen()) 236189251Ssam throw new RuntimeException("Channel should be closed"); 237189251Ssam } 238189251Ssam } 239189251Ssam 240189251Ssam // repeat test by connecting to a (probably) non-existent host. This 241189251Ssam // improves the chance that the connect will not fail immediately. 242189251Ssam if (!skipSlowConnectTest) { 243189251Ssam try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) { 244189251Ssam try { 245189251Ssam ch.connect(genSocketAddress()).get(); 246189251Ssam } catch (ExecutionException x) { 247189251Ssam // failed to establish connection 248189251Ssam if (ch.isOpen()) 249189251Ssam throw new RuntimeException("Channel should be closed"); 250189251Ssam } 251189251Ssam } 252189251Ssam } 253189251Ssam } 254189251Ssam 255189251Ssam static void testCloseWhenPending() throws Exception { 256189251Ssam System.out.println("-- asynchronous close when connecting --"); 257189251Ssam 258189251Ssam AsynchronousSocketChannel ch; 259189251Ssam 260189251Ssam // asynchronous close while connecting 261189251Ssam ch = AsynchronousSocketChannel.open(); 262189251Ssam Future<Void> connectResult = ch.connect(genSocketAddress()); 263189251Ssam 264189251Ssam // give time to initiate the connect (SYN) 265189251Ssam Thread.sleep(50); 266189251Ssam 267189251Ssam // close 268189251Ssam ch.close(); 269189251Ssam 270189251Ssam // check that exception is thrown in timely manner 271189251Ssam try { 272189251Ssam connectResult.get(5, TimeUnit.SECONDS); 273189251Ssam } catch (TimeoutException x) { 274189251Ssam throw new RuntimeException("AsynchronousCloseException not thrown"); 275189251Ssam } catch (ExecutionException x) { 276189251Ssam // expected 277189251Ssam } 278214734Srpaulo 279189251Ssam System.out.println("-- asynchronous close when reading --"); 280189251Ssam 281189251Ssam try (Server server = new Server()) { 282189251Ssam ch = AsynchronousSocketChannel.open(); 283189251Ssam ch.connect(server.address()).get(); 284189251Ssam 285189251Ssam ByteBuffer dst = ByteBuffer.allocateDirect(100); 286214734Srpaulo Future<Integer> result = ch.read(dst); 287214734Srpaulo 288189251Ssam // attempt a second read - should fail with ReadPendingException 289189251Ssam ByteBuffer buf = ByteBuffer.allocateDirect(100); 290189251Ssam try { 291189251Ssam ch.read(buf); 292189251Ssam throw new RuntimeException("ReadPendingException expected"); 293189251Ssam } catch (ReadPendingException x) { 294189251Ssam } 295189251Ssam 296189251Ssam // close channel (should cause initial read to complete) 297189251Ssam ch.close(); 298189251Ssam server.accept().close(); 299189251Ssam 300189251Ssam // check that AsynchronousCloseException is thrown 301189251Ssam try { 302189251Ssam result.get(); 303189251Ssam throw new RuntimeException("Should not read"); 304189251Ssam } catch (ExecutionException x) { 305189251Ssam if (!(x.getCause() instanceof AsynchronousCloseException)) 306189251Ssam throw new RuntimeException(x); 307189251Ssam } 308189251Ssam 309189251Ssam System.out.println("-- asynchronous close when writing --"); 310189251Ssam 311189251Ssam ch = AsynchronousSocketChannel.open(); 312189251Ssam ch.connect(server.address()).get(); 313189251Ssam 314189251Ssam final AtomicReference<Throwable> writeException = 315189251Ssam new AtomicReference<Throwable>(); 316189251Ssam 317189251Ssam // write bytes to fill socket buffer 318189251Ssam ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() { 319189251Ssam public void completed(Integer result, AsynchronousSocketChannel ch) { 320189251Ssam ch.write(genBuffer(), ch, this); 321189251Ssam } 322189251Ssam public void failed(Throwable x, AsynchronousSocketChannel ch) { 323189251Ssam writeException.set(x); 324189251Ssam } 325189251Ssam }); 326189251Ssam 327189251Ssam // give time for socket buffer to fill up. 328189251Ssam Thread.sleep(5*1000); 329189251Ssam 330189251Ssam // attempt a concurrent write - should fail with WritePendingException 331189251Ssam try { 332189251Ssam ch.write(genBuffer()); 333189251Ssam throw new RuntimeException("WritePendingException expected"); 334189251Ssam } catch (WritePendingException x) { 335189251Ssam } 336189251Ssam 337189251Ssam // close channel - should cause initial write to complete 338189251Ssam ch.close(); 339189251Ssam server.accept().close(); 340189251Ssam 341189251Ssam // wait for exception 342189251Ssam while (writeException.get() == null) { 343189251Ssam Thread.sleep(100); 344189251Ssam } 345189251Ssam if (!(writeException.get() instanceof AsynchronousCloseException)) 346189251Ssam throw new RuntimeException("AsynchronousCloseException expected"); 347189251Ssam } 348189251Ssam } 349189251Ssam 350189251Ssam static void testCancel() throws Exception { 351189251Ssam System.out.println("-- cancel --"); 352189251Ssam 353189251Ssam try (Server server = new Server()) { 354189251Ssam for (int i=0; i<2; i++) { 355189251Ssam boolean mayInterruptIfRunning = (i == 0) ? false : true; 356189251Ssam 357189251Ssam // establish loopback connection 358189251Ssam AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 359189251Ssam ch.connect(server.address()).get(); 360189251Ssam SocketChannel peer = server.accept(); 361189251Ssam 362189251Ssam // start read operation 363189251Ssam ByteBuffer buf = ByteBuffer.allocate(1); 364189251Ssam Future<Integer> res = ch.read(buf); 365189251Ssam 366189251Ssam // cancel operation 367189251Ssam boolean cancelled = res.cancel(mayInterruptIfRunning); 368189251Ssam 369189251Ssam // check post-conditions 370189251Ssam if (!res.isDone()) 371189251Ssam throw new RuntimeException("isDone should return true"); 372189251Ssam if (res.isCancelled() != cancelled) 373189251Ssam throw new RuntimeException("isCancelled not consistent"); 374189251Ssam try { 375189251Ssam res.get(); 376189251Ssam throw new RuntimeException("CancellationException expected"); 377189251Ssam } catch (CancellationException x) { 378189251Ssam } 379189251Ssam try { 380189251Ssam res.get(1, TimeUnit.SECONDS); 381189251Ssam throw new RuntimeException("CancellationException expected"); 382189251Ssam } catch (CancellationException x) { 383189251Ssam } 384189251Ssam 385189251Ssam // check that the cancel doesn't impact writing to the channel 386189251Ssam if (!mayInterruptIfRunning) { 387189251Ssam buf = ByteBuffer.wrap("a".getBytes()); 388189251Ssam ch.write(buf).get(); 389189251Ssam } 390189251Ssam 391189251Ssam ch.close(); 392189251Ssam peer.close(); 393189251Ssam } 394189251Ssam } 395189251Ssam } 396189251Ssam 397189251Ssam static void testRead1() throws Exception { 398189251Ssam System.out.println("-- read (1) --"); 399189251Ssam 400189251Ssam try (Server server = new Server()) { 401189251Ssam final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 402189251Ssam ch.connect(server.address()).get(); 403189251Ssam 404189251Ssam // read with 0 bytes remaining should complete immediately 405189251Ssam ByteBuffer buf = ByteBuffer.allocate(1); 406189251Ssam buf.put((byte)0); 407189251Ssam int n = ch.read(buf).get(); 408189251Ssam if (n != 0) 409189251Ssam throw new RuntimeException("0 expected"); 410189251Ssam 411189251Ssam // write bytes and close connection 412189251Ssam ByteBuffer src = genBuffer(); 413189251Ssam try (SocketChannel sc = server.accept()) { 414189251Ssam sc.setOption(SO_SNDBUF, src.remaining()); 415189251Ssam while (src.hasRemaining()) 416189251Ssam sc.write(src); 417189251Ssam } 418189251Ssam 419189251Ssam // reads should complete immediately 420189251Ssam final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); 421189251Ssam final CountDownLatch latch = new CountDownLatch(1); 422189251Ssam ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { 423189251Ssam public void completed(Integer result, Void att) { 424189251Ssam int n = result; 425189251Ssam if (n > 0) { 426189251Ssam ch.read(dst, (Void)null, this); 427189251Ssam } else { 428189251Ssam latch.countDown(); 429189251Ssam } 430189251Ssam } 431189251Ssam public void failed(Throwable exc, Void att) { 432189251Ssam } 433214734Srpaulo }); 434214734Srpaulo 435214734Srpaulo latch.await(); 436214734Srpaulo 437214734Srpaulo // check buffers 438214734Srpaulo src.flip(); 439214734Srpaulo dst.flip(); 440214734Srpaulo if (!src.equals(dst)) { 441214734Srpaulo throw new RuntimeException("Contents differ"); 442214734Srpaulo } 443214734Srpaulo 444214734Srpaulo // close channel 445214734Srpaulo ch.close(); 446214734Srpaulo 447214734Srpaulo // check read fails with ClosedChannelException 448214734Srpaulo try { 449252726Srpaulo ch.read(dst).get(); 450252726Srpaulo throw new RuntimeException("ExecutionException expected"); 451252726Srpaulo } catch (ExecutionException x) { 452252726Srpaulo if (!(x.getCause() instanceof ClosedChannelException)) 453252726Srpaulo throw new RuntimeException("Cause of ClosedChannelException expected"); 454252726Srpaulo } 455252726Srpaulo } 456252726Srpaulo } 457252726Srpaulo 458252726Srpaulo static void testRead2() throws Exception { 459252726Srpaulo System.out.println("-- read (2) --"); 460189251Ssam 461 try (Server server = new Server()) { 462 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 463 ch.connect(server.address()).get(); 464 SocketChannel sc = server.accept(); 465 466 ByteBuffer src = genBuffer(); 467 468 // read until the buffer is full 469 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity()); 470 final CountDownLatch latch = new CountDownLatch(1); 471 ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { 472 public void completed(Integer result, Void att) { 473 if (dst.hasRemaining()) { 474 ch.read(dst, (Void)null, this); 475 } else { 476 latch.countDown(); 477 } 478 } 479 public void failed(Throwable exc, Void att) { 480 } 481 }); 482 483 // trickle the writing 484 do { 485 int rem = src.remaining(); 486 int size = (rem <= 100) ? rem : 50 + rand.nextInt(rem - 100); 487 ByteBuffer buf = ByteBuffer.allocate(size); 488 for (int i=0; i<size; i++) 489 buf.put(src.get()); 490 buf.flip(); 491 Thread.sleep(50 + rand.nextInt(1500)); 492 while (buf.hasRemaining()) 493 sc.write(buf); 494 } while (src.hasRemaining()); 495 496 // wait until ascynrhonous reading has completed 497 latch.await(); 498 499 // check buffers 500 src.flip(); 501 dst.flip(); 502 if (!src.equals(dst)) { 503 throw new RuntimeException("Contents differ"); 504 } 505 506 sc.close(); 507 ch.close(); 508 } 509 } 510 511 // exercise scattering read 512 static void testRead3() throws Exception { 513 System.out.println("-- read (3) --"); 514 515 try (Server server = new Server()) { 516 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 517 ch.connect(server.address()).get(); 518 SocketChannel sc = server.accept(); 519 520 ByteBuffer[] dsts = new ByteBuffer[3]; 521 for (int i=0; i<dsts.length; i++) { 522 dsts[i] = ByteBuffer.allocateDirect(100); 523 } 524 525 // scattering read that completes ascynhronously 526 final CountDownLatch l1 = new CountDownLatch(1); 527 ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, 528 new CompletionHandler<Long,Void>() { 529 public void completed(Long result, Void att) { 530 long n = result; 531 if (n <= 0) 532 throw new RuntimeException("No bytes read"); 533 l1.countDown(); 534 } 535 public void failed(Throwable exc, Void att) { 536 } 537 }); 538 539 // write some bytes 540 sc.write(genBuffer()); 541 542 // read should now complete 543 l1.await(); 544 545 // write more bytes 546 sc.write(genBuffer()); 547 548 // read should complete immediately 549 for (int i=0; i<dsts.length; i++) { 550 dsts[i].rewind(); 551 } 552 553 final CountDownLatch l2 = new CountDownLatch(1); 554 ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, 555 new CompletionHandler<Long,Void>() { 556 public void completed(Long result, Void att) { 557 long n = result; 558 if (n <= 0) 559 throw new RuntimeException("No bytes read"); 560 l2.countDown(); 561 } 562 public void failed(Throwable exc, Void att) { 563 } 564 }); 565 l2.await(); 566 567 ch.close(); 568 sc.close(); 569 } 570 } 571 572 static void testWrite1() throws Exception { 573 System.out.println("-- write (1) --"); 574 575 try (Server server = new Server()) { 576 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 577 ch.connect(server.address()).get(); 578 SocketChannel sc = server.accept(); 579 580 // write with 0 bytes remaining should complete immediately 581 ByteBuffer buf = ByteBuffer.allocate(1); 582 buf.put((byte)0); 583 int n = ch.write(buf).get(); 584 if (n != 0) 585 throw new RuntimeException("0 expected"); 586 587 // write all bytes and close connection when done 588 final ByteBuffer src = genBuffer(); 589 ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() { 590 public void completed(Integer result, Void att) { 591 if (src.hasRemaining()) { 592 ch.write(src, (Void)null, this); 593 } else { 594 try { 595 ch.close(); 596 } catch (IOException ignore) { } 597 } 598 } 599 public void failed(Throwable exc, Void att) { 600 } 601 }); 602 603 // read to EOF or buffer full 604 ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); 605 do { 606 n = sc.read(dst); 607 } while (n > 0); 608 sc.close(); 609 610 // check buffers 611 src.flip(); 612 dst.flip(); 613 if (!src.equals(dst)) { 614 throw new RuntimeException("Contents differ"); 615 } 616 617 // check write fails with ClosedChannelException 618 try { 619 ch.read(dst).get(); 620 throw new RuntimeException("ExecutionException expected"); 621 } catch (ExecutionException x) { 622 if (!(x.getCause() instanceof ClosedChannelException)) 623 throw new RuntimeException("Cause of ClosedChannelException expected"); 624 } 625 } 626 } 627 628 // exercise gathering write 629 static void testWrite2() throws Exception { 630 System.out.println("-- write (2) --"); 631 632 try (Server server = new Server()) { 633 final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 634 ch.connect(server.address()).get(); 635 SocketChannel sc = server.accept(); 636 637 // number of bytes written 638 final AtomicLong bytesWritten = new AtomicLong(0); 639 640 // write buffers (should complete immediately) 641 ByteBuffer[] srcs = genBuffers(1); 642 final CountDownLatch l1 = new CountDownLatch(1); 643 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, 644 new CompletionHandler<Long,Void>() { 645 public void completed(Long result, Void att) { 646 long n = result; 647 if (n <= 0) 648 throw new RuntimeException("No bytes read"); 649 bytesWritten.addAndGet(n); 650 l1.countDown(); 651 } 652 public void failed(Throwable exc, Void att) { 653 } 654 }); 655 l1.await(); 656 657 // set to true to signal that no more buffers should be written 658 final AtomicBoolean continueWriting = new AtomicBoolean(true); 659 660 // write until socket buffer is full so as to create the conditions 661 // for when a write does not complete immediately 662 srcs = genBuffers(1); 663 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, 664 new CompletionHandler<Long,Void>() { 665 public void completed(Long result, Void att) { 666 long n = result; 667 if (n <= 0) 668 throw new RuntimeException("No bytes written"); 669 bytesWritten.addAndGet(n); 670 if (continueWriting.get()) { 671 ByteBuffer[] srcs = genBuffers(8); 672 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, 673 (Void)null, this); 674 } 675 } 676 public void failed(Throwable exc, Void att) { 677 } 678 }); 679 680 // give time for socket buffer to fill up. 681 Thread.sleep(5*1000); 682 683 // signal handler to stop further writing 684 continueWriting.set(false); 685 686 // read until done 687 ByteBuffer buf = ByteBuffer.allocateDirect(4096); 688 long total = 0L; 689 do { 690 int n = sc.read(buf); 691 if (n <= 0) 692 throw new RuntimeException("No bytes read"); 693 buf.rewind(); 694 total += n; 695 } while (total < bytesWritten.get()); 696 697 ch.close(); 698 sc.close(); 699 } 700 } 701 702 static void testShutdown() throws Exception { 703 System.out.println("-- shutdown--"); 704 705 try (Server server = new Server(); 706 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) 707 { 708 ch.connect(server.address()).get(); 709 try (SocketChannel peer = server.accept()) { 710 ByteBuffer buf = ByteBuffer.allocateDirect(1000); 711 int n; 712 713 // check read 714 ch.shutdownInput(); 715 n = ch.read(buf).get(); 716 if (n != -1) 717 throw new RuntimeException("-1 expected"); 718 // check full with full buffer 719 buf.put(new byte[100]); 720 n = ch.read(buf).get(); 721 if (n != -1) 722 throw new RuntimeException("-1 expected"); 723 724 // check write 725 ch.shutdownOutput(); 726 try { 727 ch.write(buf).get(); 728 throw new RuntimeException("ClosedChannelException expected"); 729 } catch (ExecutionException x) { 730 if (!(x.getCause() instanceof ClosedChannelException)) 731 throw new RuntimeException("ClosedChannelException expected"); 732 } 733 } 734 } 735 } 736 737 static void testTimeout() throws Exception { 738 System.out.println("-- timeouts --"); 739 testTimeout(Integer.MIN_VALUE, TimeUnit.SECONDS); 740 testTimeout(-1L, TimeUnit.SECONDS); 741 testTimeout(0L, TimeUnit.SECONDS); 742 testTimeout(2L, TimeUnit.SECONDS); 743 } 744 745 static void testTimeout(final long timeout, final TimeUnit unit) throws Exception { 746 try (Server server = new Server()) { 747 AsynchronousSocketChannel ch = AsynchronousSocketChannel.open(); 748 ch.connect(server.address()).get(); 749 750 ByteBuffer dst = ByteBuffer.allocate(512); 751 752 final AtomicReference<Throwable> readException = new AtomicReference<Throwable>(); 753 754 // this read should timeout if value is > 0 755 ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() { 756 public void completed(Integer result, Void att) { 757 readException.set(new RuntimeException("Should not complete")); 758 } 759 public void failed(Throwable exc, Void att) { 760 readException.set(exc); 761 } 762 }); 763 if (timeout > 0L) { 764 // wait for exception 765 while (readException.get() == null) { 766 Thread.sleep(100); 767 } 768 if (!(readException.get() instanceof InterruptedByTimeoutException)) 769 throw new RuntimeException("InterruptedByTimeoutException expected"); 770 771 // after a timeout then further reading should throw unspecified runtime exception 772 boolean exceptionThrown = false; 773 try { 774 ch.read(dst); 775 } catch (RuntimeException x) { 776 exceptionThrown = true; 777 } 778 if (!exceptionThrown) 779 throw new RuntimeException("RuntimeException expected after timeout."); 780 } else { 781 Thread.sleep(1000); 782 Throwable exc = readException.get(); 783 if (exc != null) 784 throw new RuntimeException(exc); 785 } 786 787 final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>(); 788 789 // write bytes to fill socket buffer 790 ch.write(genBuffer(), timeout, unit, ch, 791 new CompletionHandler<Integer,AsynchronousSocketChannel>() 792 { 793 public void completed(Integer result, AsynchronousSocketChannel ch) { 794 ch.write(genBuffer(), timeout, unit, ch, this); 795 } 796 public void failed(Throwable exc, AsynchronousSocketChannel ch) { 797 writeException.set(exc); 798 } 799 }); 800 if (timeout > 0) { 801 // wait for exception 802 while (writeException.get() == null) { 803 Thread.sleep(100); 804 } 805 if (!(writeException.get() instanceof InterruptedByTimeoutException)) 806 throw new RuntimeException("InterruptedByTimeoutException expected"); 807 808 // after a timeout then further writing should throw unspecified runtime exception 809 boolean exceptionThrown = false; 810 try { 811 ch.write(genBuffer()); 812 } catch (RuntimeException x) { 813 exceptionThrown = true; 814 } 815 if (!exceptionThrown) 816 throw new RuntimeException("RuntimeException expected after timeout."); 817 } else { 818 Thread.sleep(1000); 819 Throwable exc = writeException.get(); 820 if (exc != null) 821 throw new RuntimeException(exc); 822 } 823 824 // clean-up 825 server.accept().close(); 826 ch.close(); 827 } 828 } 829 830 // returns ByteBuffer with random bytes 831 static ByteBuffer genBuffer() { 832 int size = 1024 + rand.nextInt(16000); 833 byte[] buf = new byte[size]; 834 rand.nextBytes(buf); 835 boolean useDirect = rand.nextBoolean(); 836 if (useDirect) { 837 ByteBuffer bb = ByteBuffer.allocateDirect(buf.length); 838 bb.put(buf); 839 bb.flip(); 840 return bb; 841 } else { 842 return ByteBuffer.wrap(buf); 843 } 844 } 845 846 // return ByteBuffer[] with random bytes 847 static ByteBuffer[] genBuffers(int max) { 848 int len = 1; 849 if (max > 1) 850 len += rand.nextInt(max); 851 ByteBuffer[] bufs = new ByteBuffer[len]; 852 for (int i=0; i<len; i++) 853 bufs[i] = genBuffer(); 854 return bufs; 855 } 856 857 // return random SocketAddress 858 static SocketAddress genSocketAddress() { 859 StringBuilder sb = new StringBuilder("10."); 860 sb.append(rand.nextInt(256)); 861 sb.append('.'); 862 sb.append(rand.nextInt(256)); 863 sb.append('.'); 864 sb.append(rand.nextInt(256)); 865 InetAddress rh; 866 try { 867 rh = InetAddress.getByName(sb.toString()); 868 } catch (UnknownHostException x) { 869 throw new InternalError("Should not happen"); 870 } 871 return new InetSocketAddress(rh, rand.nextInt(65535)+1); 872 } 873} 874