AsyncCloseAndInterrupt.java revision 2362:00cd9dc3c2b5
1/*
2 * Copyright (c) 2002, 2008, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24/* @test
25 * @bug 4460583 4470470 4840199 6419424 6710579 6596323 6824135
26 * @summary Comprehensive test of asynchronous closing and interruption
27 * @author Mark Reinhold
28 */
29
30import java.io.*;
31import java.net.*;
32import java.nio.*;
33import java.nio.channels.*;
34import java.util.*;
35
36
37public class AsyncCloseAndInterrupt {
38
39    static PrintStream log = System.err;
40
41    static void sleep(int ms) {
42        try {
43            Thread.sleep(ms);
44        } catch (InterruptedException x) { }
45    }
46
47    // Wildcard address localized to this machine -- Windoze doesn't allow
48    // connecting to a server socket that was previously bound to a true
49    // wildcard, namely new InetSocketAddress((InetAddress)null, 0).
50    //
51    private static InetSocketAddress wildcardAddress;
52
53
54    // Server socket that blindly accepts all connections
55
56    static ServerSocketChannel acceptor;
57
58    private static void initAcceptor() throws IOException {
59        acceptor = ServerSocketChannel.open();
60        acceptor.socket().bind(wildcardAddress);
61
62        Thread th = new Thread("Acceptor") {
63                public void run() {
64                    try {
65                        for (;;) {
66                            SocketChannel sc = acceptor.accept();
67                        }
68                    } catch (IOException x) {
69                        x.printStackTrace();
70                    }
71                }
72            };
73
74        th.setDaemon(true);
75        th.start();
76    }
77
78
79    // Server socket that refuses all connections
80
81    static ServerSocketChannel refuser;
82    static List refuserClients = new ArrayList();
83
84    private static void initRefuser() throws IOException {
85        refuser = ServerSocketChannel.open();
86        refuser.socket().bind(wildcardAddress);
87        pumpRefuser("Initializing refuser...");
88    }
89
90    private static void pumpRefuser(String msg) throws IOException {
91        log.print(msg);
92        int n = refuserClients.size();
93
94        // Saturate the refuser's connection backlog so that further connection
95        // attempts will block
96        //
97        outer:
98        for (;;) {
99            SocketChannel sc = SocketChannel.open();
100            sc.configureBlocking(false);
101            if (!sc.connect(refuser.socket().getLocalSocketAddress())) {
102                for (int i = 0; i < 20; i++) {
103                    Thread.yield();
104                    if (sc.finishConnect())
105                        break;
106                    if (i >= 19)
107                        break outer;
108                }
109            }
110            // Retain so that finalizer doesn't close
111            refuserClients.add(sc);
112        }
113
114        log.println("  " + (refuserClients.size() - n) + " connections");
115    }
116
117
118    // Dead pipe source and sink
119
120    static Pipe.SourceChannel deadSource;
121    static Pipe.SinkChannel deadSink;
122
123    private static void initPipes() throws IOException {
124        if (deadSource != null)
125            deadSource.close();
126        deadSource = Pipe.open().source();
127        if (deadSink != null)
128            deadSink.close();
129        deadSink = Pipe.open().sink();
130    }
131
132
133    // Files
134
135    private static File fifoFile = null; // File that blocks on reads and writes
136    private static File diskFile = null; // Disk file
137
138    private static void initFile() throws Exception {
139
140        diskFile = File.createTempFile("aci", ".tmp");
141        diskFile.deleteOnExit();
142        FileChannel fc = new FileOutputStream(diskFile).getChannel();
143        buffer.clear();
144        if (fc.write(buffer) != buffer.capacity())
145            throw new RuntimeException("Cannot create disk file");
146        fc.close();
147
148        if (TestUtil.onWindows()) {
149            log.println("WARNING: Cannot completely test FileChannels on Windows");
150            return;
151        }
152        fifoFile = new File("x.fifo");
153        if (fifoFile.exists()) {
154            if (!fifoFile.delete())
155                throw new IOException("Cannot delete existing fifo " + fifoFile);
156        }
157        Process p = Runtime.getRuntime().exec("mkfifo " + fifoFile);
158        if (p.waitFor() != 0)
159            throw new IOException("Error creating fifo");
160        new RandomAccessFile(fifoFile, "rw").close();
161
162    }
163
164
165    // Channel factories
166
167    static abstract class ChannelFactory {
168        private final String name;
169        ChannelFactory(String name) {
170            this.name = name;
171        }
172        public String toString() {
173            return name;
174        }
175        abstract InterruptibleChannel create() throws IOException;
176    }
177
178    static ChannelFactory socketChannelFactory
179        = new ChannelFactory("SocketChannel") {
180                InterruptibleChannel create() throws IOException {
181                    return SocketChannel.open();
182                }
183            };
184
185    static ChannelFactory connectedSocketChannelFactory
186        = new ChannelFactory("SocketChannel") {
187                InterruptibleChannel create() throws IOException {
188                    SocketAddress sa = acceptor.socket().getLocalSocketAddress();
189                    return SocketChannel.open(sa);
190                }
191            };
192
193    static ChannelFactory serverSocketChannelFactory
194        = new ChannelFactory("ServerSocketChannel") {
195                InterruptibleChannel create() throws IOException {
196                    ServerSocketChannel ssc = ServerSocketChannel.open();
197                    ssc.socket().bind(wildcardAddress);
198                    return ssc;
199                }
200            };
201
202    static ChannelFactory datagramChannelFactory
203        = new ChannelFactory("DatagramChannel") {
204                InterruptibleChannel create() throws IOException {
205                    DatagramChannel dc = DatagramChannel.open();
206                    dc.socket().bind(wildcardAddress);
207                    InetAddress ia = InetAddress.getByName("127.0.0.1");
208                    dc.connect(new InetSocketAddress(ia, 80));
209                    return dc;
210                }
211            };
212
213    static ChannelFactory pipeSourceChannelFactory
214        = new ChannelFactory("Pipe.SourceChannel") {
215                InterruptibleChannel create() throws IOException {
216                    // ## arrange to close sink
217                    return Pipe.open().source();
218                }
219            };
220
221    static ChannelFactory pipeSinkChannelFactory
222        = new ChannelFactory("Pipe.SinkChannel") {
223                InterruptibleChannel create() throws IOException {
224                    // ## arrange to close source
225                    return Pipe.open().sink();
226                }
227            };
228
229    static ChannelFactory fifoFileChannelFactory
230        = new ChannelFactory("FileChannel") {
231                InterruptibleChannel create() throws IOException {
232                    return new RandomAccessFile(fifoFile, "rw").getChannel();
233                }
234            };
235
236    static ChannelFactory diskFileChannelFactory
237        = new ChannelFactory("FileChannel") {
238                InterruptibleChannel create() throws IOException {
239                    return new RandomAccessFile(diskFile, "rw").getChannel();
240                }
241            };
242
243
244    // I/O operations
245
246    static abstract class Op {
247        private final String name;
248        protected Op(String name) {
249            this.name = name;
250        }
251        abstract void doIO(InterruptibleChannel ich) throws IOException;
252        void setup() throws IOException { }
253        public String toString() { return name; }
254    }
255
256    static ByteBuffer buffer = ByteBuffer.allocateDirect(1 << 20);
257
258    static ByteBuffer[] buffers = new ByteBuffer[] {
259        ByteBuffer.allocateDirect(1 << 19),
260        ByteBuffer.allocateDirect(1 << 19)
261    };
262
263    static void clearBuffers() {
264        buffers[0].clear();
265        buffers[1].clear();
266    }
267
268    static void show(Channel ch) {
269        log.print("Channel " + (ch.isOpen() ? "open" : "closed"));
270        if (ch.isOpen() && (ch instanceof SocketChannel)) {
271            SocketChannel sc = (SocketChannel)ch;
272            if (sc.socket().isInputShutdown())
273                log.print(", input shutdown");
274            if (sc.socket().isOutputShutdown())
275                log.print(", output shutdown");
276        }
277        log.println();
278    }
279
280    static final Op READ = new Op("read") {
281            void doIO(InterruptibleChannel ich) throws IOException {
282                ReadableByteChannel rbc = (ReadableByteChannel)ich;
283                buffer.clear();
284                int n = rbc.read(buffer);
285                log.println("Read returned " + n);
286                show(rbc);
287                if     (rbc.isOpen()
288                        && (n == -1)
289                        && (rbc instanceof SocketChannel)
290                        && ((SocketChannel)rbc).socket().isInputShutdown()) {
291                    return;
292                }
293                throw new RuntimeException("Read succeeded");
294            }
295        };
296
297    static final Op READV = new Op("readv") {
298            void doIO(InterruptibleChannel ich) throws IOException {
299                ScatteringByteChannel sbc = (ScatteringByteChannel)ich;
300                clearBuffers();
301                int n = (int)sbc.read(buffers);
302                log.println("Read returned " + n);
303                show(sbc);
304                if     (sbc.isOpen()
305                        && (n == -1)
306                        && (sbc instanceof SocketChannel)
307                        && ((SocketChannel)sbc).socket().isInputShutdown()) {
308                    return;
309                }
310                throw new RuntimeException("Read succeeded");
311            }
312        };
313
314    static final Op RECEIVE = new Op("receive") {
315            void doIO(InterruptibleChannel ich) throws IOException {
316                DatagramChannel dc = (DatagramChannel)ich;
317                buffer.clear();
318                dc.receive(buffer);
319                show(dc);
320                throw new RuntimeException("Read succeeded");
321            }
322        };
323
324    static final Op WRITE = new Op("write") {
325            void doIO(InterruptibleChannel ich) throws IOException {
326
327                WritableByteChannel wbc = (WritableByteChannel)ich;
328
329                SocketChannel sc = null;
330                if (wbc instanceof SocketChannel)
331                    sc = (SocketChannel)wbc;
332
333                int n = 0;
334                for (;;) {
335                    buffer.clear();
336                    int d = wbc.write(buffer);
337                    n += d;
338                    if (!wbc.isOpen())
339                        break;
340                    if ((sc != null) && sc.socket().isOutputShutdown())
341                        break;
342                }
343                log.println("Wrote " + n + " bytes");
344                show(wbc);
345            }
346        };
347
348    static final Op WRITEV = new Op("writev") {
349            void doIO(InterruptibleChannel ich) throws IOException {
350
351                GatheringByteChannel gbc = (GatheringByteChannel)ich;
352
353                SocketChannel sc = null;
354                if (gbc instanceof SocketChannel)
355                    sc = (SocketChannel)gbc;
356
357                int n = 0;
358                for (;;) {
359                    clearBuffers();
360                    int d = (int)gbc.write(buffers);
361                    n += d;
362                    if (!gbc.isOpen())
363                        break;
364                    if ((sc != null) && sc.socket().isOutputShutdown())
365                        break;
366                }
367                log.println("Wrote " + n + " bytes");
368                show(gbc);
369
370            }
371        };
372
373    static final Op CONNECT = new Op("connect") {
374            void setup() throws IOException {
375                pumpRefuser("Pumping refuser ...");
376            }
377            void doIO(InterruptibleChannel ich) throws IOException {
378                SocketChannel sc = (SocketChannel)ich;
379                if (sc.connect(refuser.socket().getLocalSocketAddress()))
380                    throw new RuntimeException("Connection succeeded");
381                throw new RuntimeException("Connection did not block");
382            }
383        };
384
385    static final Op FINISH_CONNECT = new Op("finishConnect") {
386            void setup() throws IOException {
387                pumpRefuser("Pumping refuser ...");
388            }
389            void doIO(InterruptibleChannel ich) throws IOException {
390                SocketChannel sc = (SocketChannel)ich;
391                sc.configureBlocking(false);
392                SocketAddress sa = refuser.socket().getLocalSocketAddress();
393                if (sc.connect(sa))
394                    throw new RuntimeException("Connection succeeded");
395                sc.configureBlocking(true);
396                if (sc.finishConnect())
397                    throw new RuntimeException("Connection succeeded");
398                throw new RuntimeException("Connection did not block");
399            }
400        };
401
402    static final Op ACCEPT = new Op("accept") {
403            void doIO(InterruptibleChannel ich) throws IOException {
404                ServerSocketChannel ssc = (ServerSocketChannel)ich;
405                ssc.accept();
406                throw new RuntimeException("Accept succeeded");
407            }
408        };
409
410    // Use only with diskFileChannelFactory
411    static final Op TRANSFER_TO = new Op("transferTo") {
412            void doIO(InterruptibleChannel ich) throws IOException {
413                FileChannel fc = (FileChannel)ich;
414                long n = fc.transferTo(0, fc.size(), deadSink);
415                log.println("Transferred " + n + " bytes");
416                show(fc);
417            }
418        };
419
420    // Use only with diskFileChannelFactory
421    static final Op TRANSFER_FROM = new Op("transferFrom") {
422            void doIO(InterruptibleChannel ich) throws IOException {
423                FileChannel fc = (FileChannel)ich;
424                long n = fc.transferFrom(deadSource, 0, 1 << 20);
425                log.println("Transferred " + n + " bytes");
426                show(fc);
427            }
428        };
429
430
431
432    // Test modes
433
434    static final int TEST_PREINTR = 0;  // Interrupt thread before I/O
435    static final int TEST_INTR = 1;     // Interrupt thread during I/O
436    static final int TEST_CLOSE = 2;    // Close channel during I/O
437    static final int TEST_SHUTI = 3;    // Shutdown input during I/O
438    static final int TEST_SHUTO = 4;    // Shutdown output during I/O
439
440    static final String[] testName = new String[] {
441        "pre-interrupt", "interrupt", "close",
442        "shutdown-input", "shutdown-output"
443    };
444
445
446    static class Tester extends TestThread {
447
448        private InterruptibleChannel ch;
449        private Op op;
450        private int test;
451        volatile boolean ready = false;
452
453        protected Tester(ChannelFactory cf, InterruptibleChannel ch,
454                         Op op, int test)
455        {
456            super(cf + "/" + op + "/" + testName[test]);
457            this.ch = ch;
458            this.op = op;
459            this.test = test;
460        }
461
462        private void caught(Channel ch, IOException x) {
463            String xn = x.getClass().getName();
464            switch (test) {
465
466            case TEST_PREINTR:
467            case TEST_INTR:
468                if (!xn.equals("java.nio.channels.ClosedByInterruptException"))
469                    throw new RuntimeException("Wrong exception thrown: " + x);
470                break;
471
472            case TEST_CLOSE:
473            case TEST_SHUTO:
474                if (!xn.equals("java.nio.channels.AsynchronousCloseException"))
475                    throw new RuntimeException("Wrong exception thrown: " + x);
476                break;
477
478            case TEST_SHUTI:
479                if (TestUtil.onWindows())
480                    break;
481                // FALL THROUGH
482
483            default:
484                throw new Error(x);
485            }
486
487            if (ch.isOpen()) {
488                if (test == TEST_SHUTO) {
489                    SocketChannel sc = (SocketChannel)ch;
490                    if (!sc.socket().isOutputShutdown())
491                        throw new RuntimeException("Output not shutdown");
492                } else if ((test == TEST_INTR) && (op == TRANSFER_FROM)) {
493                    // Let this case pass -- CBIE applies to other channel
494                } else {
495                    throw new RuntimeException("Channel still open");
496                }
497            }
498
499            log.println("Thrown as expected: " + x);
500        }
501
502        final void go() throws Exception {
503            if (test == TEST_PREINTR)
504                Thread.currentThread().interrupt();
505            ready = true;
506            try {
507                op.doIO(ch);
508            } catch (ClosedByInterruptException x) {
509                caught(ch, x);
510            } catch (AsynchronousCloseException x) {
511                caught(ch, x);
512            } finally {
513                ch.close();
514            }
515        }
516
517    }
518
519
520    // Tests
521
522    static void test(ChannelFactory cf, Op op, int test)
523        throws Exception
524    {
525        log.println();
526        initPipes();
527        InterruptibleChannel ch = cf.create();
528        Tester t = new Tester(cf, ch, op, test);
529        log.println(t);
530        op.setup();
531        t.start();
532        do {
533            sleep(50);
534        } while (!t.ready);
535
536        sleep(100);
537
538        switch (test) {
539
540        case TEST_INTR:
541            t.interrupt();
542            break;
543
544        case TEST_CLOSE:
545            ch.close();
546            break;
547
548        case TEST_SHUTI:
549            if (TestUtil.onWindows()) {
550                log.println("WARNING: Asynchronous shutdown not working on Windows");
551                ch.close();
552            } else {
553                ((SocketChannel)ch).socket().shutdownInput();
554            }
555            break;
556
557        case TEST_SHUTO:
558            if (TestUtil.onWindows()) {
559                log.println("WARNING: Asynchronous shutdown not working on Windows");
560                ch.close();
561            } else {
562                ((SocketChannel)ch).socket().shutdownOutput();
563            }
564            break;
565
566        default:
567            break;
568        }
569
570        t.finishAndThrow(500);
571    }
572
573
574    static void test(ChannelFactory cf, Op op) throws Exception {
575        // Test INTR cases before PREINTER cases since sometimes
576        // interrupted threads can't load classes
577        test(cf, op, TEST_INTR);
578        test(cf, op, TEST_PREINTR);
579
580        // Bugs, see FileChannelImpl for details
581        if (op == TRANSFER_FROM) {
582            log.println("WARNING: transferFrom/close not tested");
583            return;
584        }
585        if ((op == TRANSFER_TO) && !TestUtil.onWindows()) {
586            log.println("WARNING: transferTo/close not tested");
587            return;
588        }
589
590        test(cf, op, TEST_CLOSE);
591    }
592
593    static void test(ChannelFactory cf)
594        throws Exception
595    {
596        InterruptibleChannel ch = cf.create(); // Sample channel
597        ch.close();
598
599        if (ch instanceof ReadableByteChannel) {
600            test(cf, READ);
601            if (ch instanceof SocketChannel)
602                test(cf, READ, TEST_SHUTI);
603        }
604
605        if (ch instanceof ScatteringByteChannel) {
606            test(cf, READV);
607            if (ch instanceof SocketChannel)
608                test(cf, READV, TEST_SHUTI);
609        }
610
611        if (ch instanceof DatagramChannel) {
612            test(cf, RECEIVE);
613
614            // Return here: We can't effectively test writes since, if they
615            // block, they do so only for a fleeting moment unless the network
616            // interface is overloaded.
617            return;
618
619        }
620
621        if (ch instanceof WritableByteChannel) {
622            test(cf, WRITE);
623            if (ch instanceof SocketChannel)
624                test(cf, WRITE, TEST_SHUTO);
625        }
626
627        if (ch instanceof GatheringByteChannel) {
628            test(cf, WRITEV);
629            if (ch instanceof SocketChannel)
630                test(cf, WRITEV, TEST_SHUTO);
631        }
632
633    }
634
635    public static void main(String[] args) throws Exception {
636
637        wildcardAddress = new InetSocketAddress(InetAddress.getLocalHost(), 0);
638        initAcceptor();
639        initRefuser();
640        initPipes();
641        initFile();
642
643        if (TestUtil.onME()) {
644            log.println("WARNING: Cannot test FileChannel transfer operations"
645                        + " on Windows 95/98/ME");
646        } else {
647            test(diskFileChannelFactory, TRANSFER_TO);
648            test(diskFileChannelFactory, TRANSFER_FROM);
649        }
650        if (fifoFile != null)
651            test(fifoFileChannelFactory);
652
653        // Testing positional file reads and writes is impractical: It requires
654        // access to a large file soft-mounted via NFS, and even then isn't
655        // completely guaranteed to work.
656        //
657        // Testing map is impractical and arguably unnecessary: It's
658        // unclear under what conditions mmap(2) will actually block.
659
660        test(connectedSocketChannelFactory);
661        test(socketChannelFactory, CONNECT);
662        test(socketChannelFactory, FINISH_CONNECT);
663        test(serverSocketChannelFactory, ACCEPT);
664        test(datagramChannelFactory);
665        test(pipeSourceChannelFactory);
666        test(pipeSinkChannelFactory);
667
668    }
669
670}
671