AsyncCloseAndInterrupt.java revision 2362:00cd9dc3c2b5
1/* 2 * Copyright (c) 2002, 2008, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24/* @test 25 * @bug 4460583 4470470 4840199 6419424 6710579 6596323 6824135 26 * @summary Comprehensive test of asynchronous closing and interruption 27 * @author Mark Reinhold 28 */ 29 30import java.io.*; 31import java.net.*; 32import java.nio.*; 33import java.nio.channels.*; 34import java.util.*; 35 36 37public class AsyncCloseAndInterrupt { 38 39 static PrintStream log = System.err; 40 41 static void sleep(int ms) { 42 try { 43 Thread.sleep(ms); 44 } catch (InterruptedException x) { } 45 } 46 47 // Wildcard address localized to this machine -- Windoze doesn't allow 48 // connecting to a server socket that was previously bound to a true 49 // wildcard, namely new InetSocketAddress((InetAddress)null, 0). 50 // 51 private static InetSocketAddress wildcardAddress; 52 53 54 // Server socket that blindly accepts all connections 55 56 static ServerSocketChannel acceptor; 57 58 private static void initAcceptor() throws IOException { 59 acceptor = ServerSocketChannel.open(); 60 acceptor.socket().bind(wildcardAddress); 61 62 Thread th = new Thread("Acceptor") { 63 public void run() { 64 try { 65 for (;;) { 66 SocketChannel sc = acceptor.accept(); 67 } 68 } catch (IOException x) { 69 x.printStackTrace(); 70 } 71 } 72 }; 73 74 th.setDaemon(true); 75 th.start(); 76 } 77 78 79 // Server socket that refuses all connections 80 81 static ServerSocketChannel refuser; 82 static List refuserClients = new ArrayList(); 83 84 private static void initRefuser() throws IOException { 85 refuser = ServerSocketChannel.open(); 86 refuser.socket().bind(wildcardAddress); 87 pumpRefuser("Initializing refuser..."); 88 } 89 90 private static void pumpRefuser(String msg) throws IOException { 91 log.print(msg); 92 int n = refuserClients.size(); 93 94 // Saturate the refuser's connection backlog so that further connection 95 // attempts will block 96 // 97 outer: 98 for (;;) { 99 SocketChannel sc = SocketChannel.open(); 100 sc.configureBlocking(false); 101 if (!sc.connect(refuser.socket().getLocalSocketAddress())) { 102 for (int i = 0; i < 20; i++) { 103 Thread.yield(); 104 if (sc.finishConnect()) 105 break; 106 if (i >= 19) 107 break outer; 108 } 109 } 110 // Retain so that finalizer doesn't close 111 refuserClients.add(sc); 112 } 113 114 log.println(" " + (refuserClients.size() - n) + " connections"); 115 } 116 117 118 // Dead pipe source and sink 119 120 static Pipe.SourceChannel deadSource; 121 static Pipe.SinkChannel deadSink; 122 123 private static void initPipes() throws IOException { 124 if (deadSource != null) 125 deadSource.close(); 126 deadSource = Pipe.open().source(); 127 if (deadSink != null) 128 deadSink.close(); 129 deadSink = Pipe.open().sink(); 130 } 131 132 133 // Files 134 135 private static File fifoFile = null; // File that blocks on reads and writes 136 private static File diskFile = null; // Disk file 137 138 private static void initFile() throws Exception { 139 140 diskFile = File.createTempFile("aci", ".tmp"); 141 diskFile.deleteOnExit(); 142 FileChannel fc = new FileOutputStream(diskFile).getChannel(); 143 buffer.clear(); 144 if (fc.write(buffer) != buffer.capacity()) 145 throw new RuntimeException("Cannot create disk file"); 146 fc.close(); 147 148 if (TestUtil.onWindows()) { 149 log.println("WARNING: Cannot completely test FileChannels on Windows"); 150 return; 151 } 152 fifoFile = new File("x.fifo"); 153 if (fifoFile.exists()) { 154 if (!fifoFile.delete()) 155 throw new IOException("Cannot delete existing fifo " + fifoFile); 156 } 157 Process p = Runtime.getRuntime().exec("mkfifo " + fifoFile); 158 if (p.waitFor() != 0) 159 throw new IOException("Error creating fifo"); 160 new RandomAccessFile(fifoFile, "rw").close(); 161 162 } 163 164 165 // Channel factories 166 167 static abstract class ChannelFactory { 168 private final String name; 169 ChannelFactory(String name) { 170 this.name = name; 171 } 172 public String toString() { 173 return name; 174 } 175 abstract InterruptibleChannel create() throws IOException; 176 } 177 178 static ChannelFactory socketChannelFactory 179 = new ChannelFactory("SocketChannel") { 180 InterruptibleChannel create() throws IOException { 181 return SocketChannel.open(); 182 } 183 }; 184 185 static ChannelFactory connectedSocketChannelFactory 186 = new ChannelFactory("SocketChannel") { 187 InterruptibleChannel create() throws IOException { 188 SocketAddress sa = acceptor.socket().getLocalSocketAddress(); 189 return SocketChannel.open(sa); 190 } 191 }; 192 193 static ChannelFactory serverSocketChannelFactory 194 = new ChannelFactory("ServerSocketChannel") { 195 InterruptibleChannel create() throws IOException { 196 ServerSocketChannel ssc = ServerSocketChannel.open(); 197 ssc.socket().bind(wildcardAddress); 198 return ssc; 199 } 200 }; 201 202 static ChannelFactory datagramChannelFactory 203 = new ChannelFactory("DatagramChannel") { 204 InterruptibleChannel create() throws IOException { 205 DatagramChannel dc = DatagramChannel.open(); 206 dc.socket().bind(wildcardAddress); 207 InetAddress ia = InetAddress.getByName("127.0.0.1"); 208 dc.connect(new InetSocketAddress(ia, 80)); 209 return dc; 210 } 211 }; 212 213 static ChannelFactory pipeSourceChannelFactory 214 = new ChannelFactory("Pipe.SourceChannel") { 215 InterruptibleChannel create() throws IOException { 216 // ## arrange to close sink 217 return Pipe.open().source(); 218 } 219 }; 220 221 static ChannelFactory pipeSinkChannelFactory 222 = new ChannelFactory("Pipe.SinkChannel") { 223 InterruptibleChannel create() throws IOException { 224 // ## arrange to close source 225 return Pipe.open().sink(); 226 } 227 }; 228 229 static ChannelFactory fifoFileChannelFactory 230 = new ChannelFactory("FileChannel") { 231 InterruptibleChannel create() throws IOException { 232 return new RandomAccessFile(fifoFile, "rw").getChannel(); 233 } 234 }; 235 236 static ChannelFactory diskFileChannelFactory 237 = new ChannelFactory("FileChannel") { 238 InterruptibleChannel create() throws IOException { 239 return new RandomAccessFile(diskFile, "rw").getChannel(); 240 } 241 }; 242 243 244 // I/O operations 245 246 static abstract class Op { 247 private final String name; 248 protected Op(String name) { 249 this.name = name; 250 } 251 abstract void doIO(InterruptibleChannel ich) throws IOException; 252 void setup() throws IOException { } 253 public String toString() { return name; } 254 } 255 256 static ByteBuffer buffer = ByteBuffer.allocateDirect(1 << 20); 257 258 static ByteBuffer[] buffers = new ByteBuffer[] { 259 ByteBuffer.allocateDirect(1 << 19), 260 ByteBuffer.allocateDirect(1 << 19) 261 }; 262 263 static void clearBuffers() { 264 buffers[0].clear(); 265 buffers[1].clear(); 266 } 267 268 static void show(Channel ch) { 269 log.print("Channel " + (ch.isOpen() ? "open" : "closed")); 270 if (ch.isOpen() && (ch instanceof SocketChannel)) { 271 SocketChannel sc = (SocketChannel)ch; 272 if (sc.socket().isInputShutdown()) 273 log.print(", input shutdown"); 274 if (sc.socket().isOutputShutdown()) 275 log.print(", output shutdown"); 276 } 277 log.println(); 278 } 279 280 static final Op READ = new Op("read") { 281 void doIO(InterruptibleChannel ich) throws IOException { 282 ReadableByteChannel rbc = (ReadableByteChannel)ich; 283 buffer.clear(); 284 int n = rbc.read(buffer); 285 log.println("Read returned " + n); 286 show(rbc); 287 if (rbc.isOpen() 288 && (n == -1) 289 && (rbc instanceof SocketChannel) 290 && ((SocketChannel)rbc).socket().isInputShutdown()) { 291 return; 292 } 293 throw new RuntimeException("Read succeeded"); 294 } 295 }; 296 297 static final Op READV = new Op("readv") { 298 void doIO(InterruptibleChannel ich) throws IOException { 299 ScatteringByteChannel sbc = (ScatteringByteChannel)ich; 300 clearBuffers(); 301 int n = (int)sbc.read(buffers); 302 log.println("Read returned " + n); 303 show(sbc); 304 if (sbc.isOpen() 305 && (n == -1) 306 && (sbc instanceof SocketChannel) 307 && ((SocketChannel)sbc).socket().isInputShutdown()) { 308 return; 309 } 310 throw new RuntimeException("Read succeeded"); 311 } 312 }; 313 314 static final Op RECEIVE = new Op("receive") { 315 void doIO(InterruptibleChannel ich) throws IOException { 316 DatagramChannel dc = (DatagramChannel)ich; 317 buffer.clear(); 318 dc.receive(buffer); 319 show(dc); 320 throw new RuntimeException("Read succeeded"); 321 } 322 }; 323 324 static final Op WRITE = new Op("write") { 325 void doIO(InterruptibleChannel ich) throws IOException { 326 327 WritableByteChannel wbc = (WritableByteChannel)ich; 328 329 SocketChannel sc = null; 330 if (wbc instanceof SocketChannel) 331 sc = (SocketChannel)wbc; 332 333 int n = 0; 334 for (;;) { 335 buffer.clear(); 336 int d = wbc.write(buffer); 337 n += d; 338 if (!wbc.isOpen()) 339 break; 340 if ((sc != null) && sc.socket().isOutputShutdown()) 341 break; 342 } 343 log.println("Wrote " + n + " bytes"); 344 show(wbc); 345 } 346 }; 347 348 static final Op WRITEV = new Op("writev") { 349 void doIO(InterruptibleChannel ich) throws IOException { 350 351 GatheringByteChannel gbc = (GatheringByteChannel)ich; 352 353 SocketChannel sc = null; 354 if (gbc instanceof SocketChannel) 355 sc = (SocketChannel)gbc; 356 357 int n = 0; 358 for (;;) { 359 clearBuffers(); 360 int d = (int)gbc.write(buffers); 361 n += d; 362 if (!gbc.isOpen()) 363 break; 364 if ((sc != null) && sc.socket().isOutputShutdown()) 365 break; 366 } 367 log.println("Wrote " + n + " bytes"); 368 show(gbc); 369 370 } 371 }; 372 373 static final Op CONNECT = new Op("connect") { 374 void setup() throws IOException { 375 pumpRefuser("Pumping refuser ..."); 376 } 377 void doIO(InterruptibleChannel ich) throws IOException { 378 SocketChannel sc = (SocketChannel)ich; 379 if (sc.connect(refuser.socket().getLocalSocketAddress())) 380 throw new RuntimeException("Connection succeeded"); 381 throw new RuntimeException("Connection did not block"); 382 } 383 }; 384 385 static final Op FINISH_CONNECT = new Op("finishConnect") { 386 void setup() throws IOException { 387 pumpRefuser("Pumping refuser ..."); 388 } 389 void doIO(InterruptibleChannel ich) throws IOException { 390 SocketChannel sc = (SocketChannel)ich; 391 sc.configureBlocking(false); 392 SocketAddress sa = refuser.socket().getLocalSocketAddress(); 393 if (sc.connect(sa)) 394 throw new RuntimeException("Connection succeeded"); 395 sc.configureBlocking(true); 396 if (sc.finishConnect()) 397 throw new RuntimeException("Connection succeeded"); 398 throw new RuntimeException("Connection did not block"); 399 } 400 }; 401 402 static final Op ACCEPT = new Op("accept") { 403 void doIO(InterruptibleChannel ich) throws IOException { 404 ServerSocketChannel ssc = (ServerSocketChannel)ich; 405 ssc.accept(); 406 throw new RuntimeException("Accept succeeded"); 407 } 408 }; 409 410 // Use only with diskFileChannelFactory 411 static final Op TRANSFER_TO = new Op("transferTo") { 412 void doIO(InterruptibleChannel ich) throws IOException { 413 FileChannel fc = (FileChannel)ich; 414 long n = fc.transferTo(0, fc.size(), deadSink); 415 log.println("Transferred " + n + " bytes"); 416 show(fc); 417 } 418 }; 419 420 // Use only with diskFileChannelFactory 421 static final Op TRANSFER_FROM = new Op("transferFrom") { 422 void doIO(InterruptibleChannel ich) throws IOException { 423 FileChannel fc = (FileChannel)ich; 424 long n = fc.transferFrom(deadSource, 0, 1 << 20); 425 log.println("Transferred " + n + " bytes"); 426 show(fc); 427 } 428 }; 429 430 431 432 // Test modes 433 434 static final int TEST_PREINTR = 0; // Interrupt thread before I/O 435 static final int TEST_INTR = 1; // Interrupt thread during I/O 436 static final int TEST_CLOSE = 2; // Close channel during I/O 437 static final int TEST_SHUTI = 3; // Shutdown input during I/O 438 static final int TEST_SHUTO = 4; // Shutdown output during I/O 439 440 static final String[] testName = new String[] { 441 "pre-interrupt", "interrupt", "close", 442 "shutdown-input", "shutdown-output" 443 }; 444 445 446 static class Tester extends TestThread { 447 448 private InterruptibleChannel ch; 449 private Op op; 450 private int test; 451 volatile boolean ready = false; 452 453 protected Tester(ChannelFactory cf, InterruptibleChannel ch, 454 Op op, int test) 455 { 456 super(cf + "/" + op + "/" + testName[test]); 457 this.ch = ch; 458 this.op = op; 459 this.test = test; 460 } 461 462 private void caught(Channel ch, IOException x) { 463 String xn = x.getClass().getName(); 464 switch (test) { 465 466 case TEST_PREINTR: 467 case TEST_INTR: 468 if (!xn.equals("java.nio.channels.ClosedByInterruptException")) 469 throw new RuntimeException("Wrong exception thrown: " + x); 470 break; 471 472 case TEST_CLOSE: 473 case TEST_SHUTO: 474 if (!xn.equals("java.nio.channels.AsynchronousCloseException")) 475 throw new RuntimeException("Wrong exception thrown: " + x); 476 break; 477 478 case TEST_SHUTI: 479 if (TestUtil.onWindows()) 480 break; 481 // FALL THROUGH 482 483 default: 484 throw new Error(x); 485 } 486 487 if (ch.isOpen()) { 488 if (test == TEST_SHUTO) { 489 SocketChannel sc = (SocketChannel)ch; 490 if (!sc.socket().isOutputShutdown()) 491 throw new RuntimeException("Output not shutdown"); 492 } else if ((test == TEST_INTR) && (op == TRANSFER_FROM)) { 493 // Let this case pass -- CBIE applies to other channel 494 } else { 495 throw new RuntimeException("Channel still open"); 496 } 497 } 498 499 log.println("Thrown as expected: " + x); 500 } 501 502 final void go() throws Exception { 503 if (test == TEST_PREINTR) 504 Thread.currentThread().interrupt(); 505 ready = true; 506 try { 507 op.doIO(ch); 508 } catch (ClosedByInterruptException x) { 509 caught(ch, x); 510 } catch (AsynchronousCloseException x) { 511 caught(ch, x); 512 } finally { 513 ch.close(); 514 } 515 } 516 517 } 518 519 520 // Tests 521 522 static void test(ChannelFactory cf, Op op, int test) 523 throws Exception 524 { 525 log.println(); 526 initPipes(); 527 InterruptibleChannel ch = cf.create(); 528 Tester t = new Tester(cf, ch, op, test); 529 log.println(t); 530 op.setup(); 531 t.start(); 532 do { 533 sleep(50); 534 } while (!t.ready); 535 536 sleep(100); 537 538 switch (test) { 539 540 case TEST_INTR: 541 t.interrupt(); 542 break; 543 544 case TEST_CLOSE: 545 ch.close(); 546 break; 547 548 case TEST_SHUTI: 549 if (TestUtil.onWindows()) { 550 log.println("WARNING: Asynchronous shutdown not working on Windows"); 551 ch.close(); 552 } else { 553 ((SocketChannel)ch).socket().shutdownInput(); 554 } 555 break; 556 557 case TEST_SHUTO: 558 if (TestUtil.onWindows()) { 559 log.println("WARNING: Asynchronous shutdown not working on Windows"); 560 ch.close(); 561 } else { 562 ((SocketChannel)ch).socket().shutdownOutput(); 563 } 564 break; 565 566 default: 567 break; 568 } 569 570 t.finishAndThrow(500); 571 } 572 573 574 static void test(ChannelFactory cf, Op op) throws Exception { 575 // Test INTR cases before PREINTER cases since sometimes 576 // interrupted threads can't load classes 577 test(cf, op, TEST_INTR); 578 test(cf, op, TEST_PREINTR); 579 580 // Bugs, see FileChannelImpl for details 581 if (op == TRANSFER_FROM) { 582 log.println("WARNING: transferFrom/close not tested"); 583 return; 584 } 585 if ((op == TRANSFER_TO) && !TestUtil.onWindows()) { 586 log.println("WARNING: transferTo/close not tested"); 587 return; 588 } 589 590 test(cf, op, TEST_CLOSE); 591 } 592 593 static void test(ChannelFactory cf) 594 throws Exception 595 { 596 InterruptibleChannel ch = cf.create(); // Sample channel 597 ch.close(); 598 599 if (ch instanceof ReadableByteChannel) { 600 test(cf, READ); 601 if (ch instanceof SocketChannel) 602 test(cf, READ, TEST_SHUTI); 603 } 604 605 if (ch instanceof ScatteringByteChannel) { 606 test(cf, READV); 607 if (ch instanceof SocketChannel) 608 test(cf, READV, TEST_SHUTI); 609 } 610 611 if (ch instanceof DatagramChannel) { 612 test(cf, RECEIVE); 613 614 // Return here: We can't effectively test writes since, if they 615 // block, they do so only for a fleeting moment unless the network 616 // interface is overloaded. 617 return; 618 619 } 620 621 if (ch instanceof WritableByteChannel) { 622 test(cf, WRITE); 623 if (ch instanceof SocketChannel) 624 test(cf, WRITE, TEST_SHUTO); 625 } 626 627 if (ch instanceof GatheringByteChannel) { 628 test(cf, WRITEV); 629 if (ch instanceof SocketChannel) 630 test(cf, WRITEV, TEST_SHUTO); 631 } 632 633 } 634 635 public static void main(String[] args) throws Exception { 636 637 wildcardAddress = new InetSocketAddress(InetAddress.getLocalHost(), 0); 638 initAcceptor(); 639 initRefuser(); 640 initPipes(); 641 initFile(); 642 643 if (TestUtil.onME()) { 644 log.println("WARNING: Cannot test FileChannel transfer operations" 645 + " on Windows 95/98/ME"); 646 } else { 647 test(diskFileChannelFactory, TRANSFER_TO); 648 test(diskFileChannelFactory, TRANSFER_FROM); 649 } 650 if (fifoFile != null) 651 test(fifoFileChannelFactory); 652 653 // Testing positional file reads and writes is impractical: It requires 654 // access to a large file soft-mounted via NFS, and even then isn't 655 // completely guaranteed to work. 656 // 657 // Testing map is impractical and arguably unnecessary: It's 658 // unclear under what conditions mmap(2) will actually block. 659 660 test(connectedSocketChannelFactory); 661 test(socketChannelFactory, CONNECT); 662 test(socketChannelFactory, FINISH_CONNECT); 663 test(serverSocketChannelFactory, ACCEPT); 664 test(datagramChannelFactory); 665 test(pipeSourceChannelFactory); 666 test(pipeSinkChannelFactory); 667 668 } 669 670} 671