1/*
2 * Copyright (c) 2002, 2016, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24/* @test
25 * @bug 4460583 4470470 4840199 6419424 6710579 6596323 6824135 6395224 7142919
26 *      8151582 8068693 8153209
27 * @run main/othervm AsyncCloseAndInterrupt
28 * @key intermittent
29 * @summary Comprehensive test of asynchronous closing and interruption
30 * @author Mark Reinhold
31 */
32
33import java.io.*;
34import java.net.*;
35import java.nio.channels.*;
36import java.nio.ByteBuffer;
37import java.util.ArrayList;
38import java.util.List;
39import java.util.concurrent.ExecutorService;
40import java.util.concurrent.Executors;
41import java.util.concurrent.ThreadFactory;
42import java.util.concurrent.Callable;
43import java.util.concurrent.Future;
44import java.util.concurrent.TimeUnit;
45
46public class AsyncCloseAndInterrupt {
47
48    static PrintStream log = System.err;
49
50    static void sleep(int ms) {
51        try {
52            Thread.sleep(ms);
53        } catch (InterruptedException x) { }
54    }
55
56    // Wildcard address localized to this machine -- Windoze doesn't allow
57    // connecting to a server socket that was previously bound to a true
58    // wildcard, namely new InetSocketAddress((InetAddress)null, 0).
59    //
60    private static InetSocketAddress wildcardAddress;
61
62
63    // Server socket that blindly accepts all connections
64
65    static ServerSocketChannel acceptor;
66
67    private static void initAcceptor() throws IOException {
68        acceptor = ServerSocketChannel.open();
69        acceptor.socket().bind(wildcardAddress);
70
71        Thread th = new Thread("Acceptor") {
72                public void run() {
73                    try {
74                        for (;;) {
75                            SocketChannel sc = acceptor.accept();
76                        }
77                    } catch (IOException x) {
78                        x.printStackTrace();
79                    }
80                }
81            };
82
83        th.setDaemon(true);
84        th.start();
85    }
86
87
88    // Server socket that refuses all connections
89
90    static ServerSocketChannel refuser;
91
92    private static void initRefuser() throws IOException {
93        refuser = ServerSocketChannel.open();
94        refuser.bind(wildcardAddress, 1);      // use minimum backlog
95    }
96
97    // Dead pipe source and sink
98
99    static Pipe.SourceChannel deadSource;
100    static Pipe.SinkChannel deadSink;
101
102    private static void initPipes() throws IOException {
103        if (deadSource != null)
104            deadSource.close();
105        deadSource = Pipe.open().source();
106        if (deadSink != null)
107            deadSink.close();
108        deadSink = Pipe.open().sink();
109    }
110
111
112    // Files
113
114    private static File fifoFile = null; // File that blocks on reads and writes
115    private static File diskFile = null; // Disk file
116
117    private static void initFile() throws Exception {
118
119        diskFile = File.createTempFile("aci", ".tmp");
120        diskFile.deleteOnExit();
121        FileChannel fc = new FileOutputStream(diskFile).getChannel();
122        buffer.clear();
123        if (fc.write(buffer) != buffer.capacity())
124            throw new RuntimeException("Cannot create disk file");
125        fc.close();
126
127        if (TestUtil.onWindows()) {
128            log.println("WARNING: Cannot completely test FileChannels on Windows");
129            return;
130        }
131        fifoFile = new File("x.fifo");
132        if (fifoFile.exists()) {
133            if (!fifoFile.delete())
134                throw new IOException("Cannot delete existing fifo " + fifoFile);
135        }
136        Process p = Runtime.getRuntime().exec("mkfifo " + fifoFile);
137        if (p.waitFor() != 0)
138            throw new IOException("Error creating fifo");
139        new RandomAccessFile(fifoFile, "rw").close();
140
141    }
142
143
144    // Channel factories
145
146    static abstract class ChannelFactory {
147        private final String name;
148        ChannelFactory(String name) {
149            this.name = name;
150        }
151        public String toString() {
152            return name;
153        }
154        abstract InterruptibleChannel create() throws IOException;
155    }
156
157    static ChannelFactory socketChannelFactory
158        = new ChannelFactory("SocketChannel") {
159                InterruptibleChannel create() throws IOException {
160                    return SocketChannel.open();
161                }
162            };
163
164    static ChannelFactory connectedSocketChannelFactory
165        = new ChannelFactory("SocketChannel") {
166                InterruptibleChannel create() throws IOException {
167                    SocketAddress sa = acceptor.socket().getLocalSocketAddress();
168                    return SocketChannel.open(sa);
169                }
170            };
171
172    static ChannelFactory serverSocketChannelFactory
173        = new ChannelFactory("ServerSocketChannel") {
174                InterruptibleChannel create() throws IOException {
175                    ServerSocketChannel ssc = ServerSocketChannel.open();
176                    ssc.socket().bind(wildcardAddress);
177                    return ssc;
178                }
179            };
180
181    static ChannelFactory datagramChannelFactory
182        = new ChannelFactory("DatagramChannel") {
183                InterruptibleChannel create() throws IOException {
184                    DatagramChannel dc = DatagramChannel.open();
185                    InetAddress lb = InetAddress.getByName("127.0.0.1");
186                    dc.bind(new InetSocketAddress(lb, 0));
187                    dc.connect(new InetSocketAddress(lb, 80));
188                    return dc;
189                }
190            };
191
192    static ChannelFactory pipeSourceChannelFactory
193        = new ChannelFactory("Pipe.SourceChannel") {
194                InterruptibleChannel create() throws IOException {
195                    // ## arrange to close sink
196                    return Pipe.open().source();
197                }
198            };
199
200    static ChannelFactory pipeSinkChannelFactory
201        = new ChannelFactory("Pipe.SinkChannel") {
202                InterruptibleChannel create() throws IOException {
203                    // ## arrange to close source
204                    return Pipe.open().sink();
205                }
206            };
207
208    static ChannelFactory fifoFileChannelFactory
209        = new ChannelFactory("FileChannel") {
210                InterruptibleChannel create() throws IOException {
211                    return new RandomAccessFile(fifoFile, "rw").getChannel();
212                }
213            };
214
215    static ChannelFactory diskFileChannelFactory
216        = new ChannelFactory("FileChannel") {
217                InterruptibleChannel create() throws IOException {
218                    return new RandomAccessFile(diskFile, "rw").getChannel();
219                }
220            };
221
222
223    // I/O operations
224
225    static abstract class Op {
226        private final String name;
227        protected Op(String name) {
228            this.name = name;
229        }
230        abstract void doIO(InterruptibleChannel ich) throws IOException;
231        void setup() throws IOException { }
232        public String toString() { return name; }
233    }
234
235    static ByteBuffer buffer = ByteBuffer.allocateDirect(1 << 20);
236
237    static ByteBuffer[] buffers = new ByteBuffer[] {
238        ByteBuffer.allocateDirect(1 << 19),
239        ByteBuffer.allocateDirect(1 << 19)
240    };
241
242    static void clearBuffers() {
243        buffers[0].clear();
244        buffers[1].clear();
245    }
246
247    static void show(Channel ch) {
248        log.print("Channel " + (ch.isOpen() ? "open" : "closed"));
249        if (ch.isOpen() && (ch instanceof SocketChannel)) {
250            SocketChannel sc = (SocketChannel)ch;
251            if (sc.socket().isInputShutdown())
252                log.print(", input shutdown");
253            if (sc.socket().isOutputShutdown())
254                log.print(", output shutdown");
255        }
256        log.println();
257    }
258
259    static final Op READ = new Op("read") {
260            void doIO(InterruptibleChannel ich) throws IOException {
261                ReadableByteChannel rbc = (ReadableByteChannel)ich;
262                buffer.clear();
263                int n = rbc.read(buffer);
264                log.println("Read returned " + n);
265                show(rbc);
266                if     (rbc.isOpen()
267                        && (n == -1)
268                        && (rbc instanceof SocketChannel)
269                        && ((SocketChannel)rbc).socket().isInputShutdown()) {
270                    return;
271                }
272                throw new RuntimeException("Read succeeded");
273            }
274        };
275
276    static final Op READV = new Op("readv") {
277            void doIO(InterruptibleChannel ich) throws IOException {
278                ScatteringByteChannel sbc = (ScatteringByteChannel)ich;
279                clearBuffers();
280                int n = (int)sbc.read(buffers);
281                log.println("Read returned " + n);
282                show(sbc);
283                if     (sbc.isOpen()
284                        && (n == -1)
285                        && (sbc instanceof SocketChannel)
286                        && ((SocketChannel)sbc).socket().isInputShutdown()) {
287                    return;
288                }
289                throw new RuntimeException("Read succeeded");
290            }
291        };
292
293    static final Op RECEIVE = new Op("receive") {
294            void doIO(InterruptibleChannel ich) throws IOException {
295                DatagramChannel dc = (DatagramChannel)ich;
296                buffer.clear();
297                dc.receive(buffer);
298                show(dc);
299                throw new RuntimeException("Read succeeded");
300            }
301        };
302
303    static final Op WRITE = new Op("write") {
304            void doIO(InterruptibleChannel ich) throws IOException {
305
306                WritableByteChannel wbc = (WritableByteChannel)ich;
307
308                SocketChannel sc = null;
309                if (wbc instanceof SocketChannel)
310                    sc = (SocketChannel)wbc;
311
312                int n = 0;
313                for (;;) {
314                    buffer.clear();
315                    int d = wbc.write(buffer);
316                    n += d;
317                    if (!wbc.isOpen())
318                        break;
319                    if ((sc != null) && sc.socket().isOutputShutdown())
320                        break;
321                }
322                log.println("Wrote " + n + " bytes");
323                show(wbc);
324            }
325        };
326
327    static final Op WRITEV = new Op("writev") {
328            void doIO(InterruptibleChannel ich) throws IOException {
329
330                GatheringByteChannel gbc = (GatheringByteChannel)ich;
331
332                SocketChannel sc = null;
333                if (gbc instanceof SocketChannel)
334                    sc = (SocketChannel)gbc;
335
336                int n = 0;
337                for (;;) {
338                    clearBuffers();
339                    int d = (int)gbc.write(buffers);
340                    n += d;
341                    if (!gbc.isOpen())
342                        break;
343                    if ((sc != null) && sc.socket().isOutputShutdown())
344                        break;
345                }
346                log.println("Wrote " + n + " bytes");
347                show(gbc);
348
349            }
350        };
351
352    static final Op CONNECT = new Op("connect") {
353            void setup() {
354                waitPump("connect waiting for pumping refuser ...");
355            }
356            void doIO(InterruptibleChannel ich) throws IOException {
357                SocketChannel sc = (SocketChannel)ich;
358                if (sc.connect(refuser.socket().getLocalSocketAddress()))
359                    throw new RuntimeException("Connection succeeded");
360                throw new RuntimeException("Connection did not block");
361            }
362        };
363
364    static final Op FINISH_CONNECT = new Op("finishConnect") {
365            void setup() {
366                waitPump("finishConnect waiting for pumping refuser ...");
367            }
368            void doIO(InterruptibleChannel ich) throws IOException {
369                SocketChannel sc = (SocketChannel)ich;
370                sc.configureBlocking(false);
371                SocketAddress sa = refuser.socket().getLocalSocketAddress();
372                if (sc.connect(sa))
373                    throw new RuntimeException("Connection succeeded");
374                sc.configureBlocking(true);
375                if (sc.finishConnect())
376                    throw new RuntimeException("Connection succeeded");
377                throw new RuntimeException("Connection did not block");
378            }
379        };
380
381    static final Op ACCEPT = new Op("accept") {
382            void doIO(InterruptibleChannel ich) throws IOException {
383                ServerSocketChannel ssc = (ServerSocketChannel)ich;
384                ssc.accept();
385                throw new RuntimeException("Accept succeeded");
386            }
387        };
388
389    // Use only with diskFileChannelFactory
390    static final Op TRANSFER_TO = new Op("transferTo") {
391            void doIO(InterruptibleChannel ich) throws IOException {
392                FileChannel fc = (FileChannel)ich;
393                long n = fc.transferTo(0, fc.size(), deadSink);
394                log.println("Transferred " + n + " bytes");
395                show(fc);
396            }
397        };
398
399    // Use only with diskFileChannelFactory
400    static final Op TRANSFER_FROM = new Op("transferFrom") {
401            void doIO(InterruptibleChannel ich) throws IOException {
402                FileChannel fc = (FileChannel)ich;
403                long n = fc.transferFrom(deadSource, 0, 1 << 20);
404                log.println("Transferred " + n + " bytes");
405                show(fc);
406            }
407        };
408
409
410
411    // Test modes
412
413    static final int TEST_PREINTR = 0;  // Interrupt thread before I/O
414    static final int TEST_INTR = 1;     // Interrupt thread during I/O
415    static final int TEST_CLOSE = 2;    // Close channel during I/O
416    static final int TEST_SHUTI = 3;    // Shutdown input during I/O
417    static final int TEST_SHUTO = 4;    // Shutdown output during I/O
418
419    static final String[] testName = new String[] {
420        "pre-interrupt", "interrupt", "close",
421        "shutdown-input", "shutdown-output"
422    };
423
424
425    static class Tester extends TestThread {
426
427        private InterruptibleChannel ch;
428        private Op op;
429        private int test;
430        volatile boolean ready = false;
431
432        protected Tester(ChannelFactory cf, InterruptibleChannel ch,
433                         Op op, int test)
434        {
435            super(cf + "/" + op + "/" + testName[test]);
436            this.ch = ch;
437            this.op = op;
438            this.test = test;
439        }
440
441        @SuppressWarnings("fallthrough")
442        private void caught(Channel ch, IOException x) {
443            String xn = x.getClass().getName();
444            switch (test) {
445
446            case TEST_PREINTR:
447            case TEST_INTR:
448                if (!xn.equals("java.nio.channels.ClosedByInterruptException"))
449                    throw new RuntimeException("Wrong exception thrown: " + x);
450                break;
451
452            case TEST_CLOSE:
453            case TEST_SHUTO:
454                if (!xn.equals("java.nio.channels.AsynchronousCloseException"))
455                    throw new RuntimeException("Wrong exception thrown: " + x);
456                break;
457
458            case TEST_SHUTI:
459                if (TestUtil.onWindows())
460                    break;
461                // FALL THROUGH
462
463            default:
464                throw new Error(x);
465            }
466
467            if (ch.isOpen()) {
468                if (test == TEST_SHUTO) {
469                    SocketChannel sc = (SocketChannel)ch;
470                    if (!sc.socket().isOutputShutdown())
471                        throw new RuntimeException("Output not shutdown");
472                } else if ((test == TEST_INTR) && (op == TRANSFER_FROM)) {
473                    // Let this case pass -- CBIE applies to other channel
474                } else {
475                    throw new RuntimeException("Channel still open");
476                }
477            }
478
479            log.println("Thrown as expected: " + x);
480        }
481
482        final void go() throws Exception {
483            if (test == TEST_PREINTR)
484                Thread.currentThread().interrupt();
485            ready = true;
486            try {
487                op.doIO(ch);
488            } catch (ClosedByInterruptException x) {
489                caught(ch, x);
490            } catch (AsynchronousCloseException x) {
491                caught(ch, x);
492            } finally {
493                ch.close();
494            }
495        }
496
497    }
498
499    private static volatile boolean pumpDone = false;
500    private static volatile boolean pumpReady = false;
501
502    private static void waitPump(String msg){
503        log.println(msg);
504        while (!pumpReady){
505            sleep(200);
506        }
507        log.println(msg + " done");
508    }
509
510    // Create a pump thread dedicated to saturate refuser's connection backlog
511    private static Future<Integer> pumpRefuser(ExecutorService pumperExecutor) {
512
513        Callable<Integer> pumpTask = new Callable<Integer>() {
514
515            @Override
516            public Integer call() throws IOException {
517                // Can't reliably saturate connection backlog on Windows Server editions
518                assert !TestUtil.onWindows();
519                log.println("Start pumping refuser ...");
520                List<SocketChannel> refuserClients = new ArrayList<>();
521
522                // Saturate the refuser's connection backlog so that further connection
523                // attempts will be blocked
524                pumpReady = false;
525                while (!pumpDone) {
526                    SocketChannel sc = SocketChannel.open();
527                    sc.configureBlocking(false);
528                    boolean connected = sc.connect(refuser.socket().getLocalSocketAddress());
529
530                    // Assume that the connection backlog is saturated if a
531                    // client cannot connect to the refuser within 50 milliseconds
532                    long start = System.currentTimeMillis();
533                    while (!pumpReady && !connected
534                            && (System.currentTimeMillis() - start < 50)) {
535                        connected = sc.finishConnect();
536                    }
537
538                    if (connected) {
539                        // Retain so that finalizer doesn't close
540                        refuserClients.add(sc);
541                    } else {
542                        sc.close();
543                        pumpReady = true;
544                    }
545                }
546
547                for (SocketChannel sc : refuserClients) {
548                    sc.close();
549                }
550                refuser.close();
551
552                log.println("Stop pumping refuser ...");
553                return refuserClients.size();
554            }
555        };
556
557        return pumperExecutor.submit(pumpTask);
558    }
559
560    // Test
561    static void test(ChannelFactory cf, Op op, int test) throws Exception {
562        test(cf, op, test, true);
563    }
564
565    static void test(ChannelFactory cf, Op op, int test, boolean extraSleep)
566        throws Exception
567    {
568        log.println();
569        initPipes();
570        InterruptibleChannel ch = cf.create();
571        Tester t = new Tester(cf, ch, op, test);
572        log.println(t);
573        op.setup();
574        t.start();
575        do {
576            sleep(50);
577        } while (!t.ready);
578
579        if (extraSleep) {
580            sleep(100);
581        }
582
583        switch (test) {
584
585        case TEST_INTR:
586            t.interrupt();
587            break;
588
589        case TEST_CLOSE:
590            ch.close();
591            break;
592
593        case TEST_SHUTI:
594            if (TestUtil.onWindows()) {
595                log.println("WARNING: Asynchronous shutdown not working on Windows");
596                ch.close();
597            } else {
598                ((SocketChannel)ch).socket().shutdownInput();
599            }
600            break;
601
602        case TEST_SHUTO:
603            if (TestUtil.onWindows()) {
604                log.println("WARNING: Asynchronous shutdown not working on Windows");
605                ch.close();
606            } else {
607                ((SocketChannel)ch).socket().shutdownOutput();
608            }
609            break;
610
611        default:
612            break;
613        }
614
615        t.finishAndThrow(10000);
616    }
617
618    static void test(ChannelFactory cf, Op op) throws Exception {
619        test(cf, op, true);
620    }
621
622    static void test(ChannelFactory cf, Op op, boolean extraSleep) throws Exception {
623        // Test INTR cases before PREINTER cases since sometimes
624        // interrupted threads can't load classes
625        test(cf, op, TEST_INTR, extraSleep);
626        test(cf, op, TEST_PREINTR, extraSleep);
627
628        // Bugs, see FileChannelImpl for details
629        if (op == TRANSFER_FROM) {
630            log.println("WARNING: transferFrom/close not tested");
631            return;
632        }
633        if ((op == TRANSFER_TO) && !TestUtil.onWindows()) {
634            log.println("WARNING: transferTo/close not tested");
635            return;
636        }
637
638        test(cf, op, TEST_CLOSE, extraSleep);
639    }
640
641    static void test(ChannelFactory cf)
642        throws Exception
643    {
644        InterruptibleChannel ch = cf.create(); // Sample channel
645        ch.close();
646
647        if (ch instanceof ReadableByteChannel) {
648            test(cf, READ);
649            if (ch instanceof SocketChannel)
650                test(cf, READ, TEST_SHUTI);
651        }
652
653        if (ch instanceof ScatteringByteChannel) {
654            test(cf, READV);
655            if (ch instanceof SocketChannel)
656                test(cf, READV, TEST_SHUTI);
657        }
658
659        if (ch instanceof DatagramChannel) {
660            test(cf, RECEIVE);
661
662            // Return here: We can't effectively test writes since, if they
663            // block, they do so only for a fleeting moment unless the network
664            // interface is overloaded.
665            return;
666
667        }
668
669        if (ch instanceof WritableByteChannel) {
670            test(cf, WRITE);
671            if (ch instanceof SocketChannel)
672                test(cf, WRITE, TEST_SHUTO);
673        }
674
675        if (ch instanceof GatheringByteChannel) {
676            test(cf, WRITEV);
677            if (ch instanceof SocketChannel)
678                test(cf, WRITEV, TEST_SHUTO);
679        }
680
681    }
682
683    public static void main(String[] args) throws Exception {
684
685        wildcardAddress = new InetSocketAddress(InetAddress.getLocalHost(), 0);
686        initAcceptor();
687        if (!TestUtil.onWindows())
688            initRefuser();
689        initPipes();
690        initFile();
691
692        if (TestUtil.onWindows()) {
693            log.println("WARNING: Cannot test FileChannel transfer operations"
694                        + " on Windows");
695        } else {
696            test(diskFileChannelFactory, TRANSFER_TO);
697            test(diskFileChannelFactory, TRANSFER_FROM);
698        }
699        if (fifoFile != null)
700            test(fifoFileChannelFactory);
701
702        // Testing positional file reads and writes is impractical: It requires
703        // access to a large file soft-mounted via NFS, and even then isn't
704        // completely guaranteed to work.
705        //
706        // Testing map is impractical and arguably unnecessary: It's
707        // unclear under what conditions mmap(2) will actually block.
708
709        test(connectedSocketChannelFactory);
710
711        if (TestUtil.onWindows()) {
712            log.println("WARNING Cannot reliably test connect/finishConnect"
713                + " operations on Windows");
714        } else {
715            // Only the following tests need refuser's connection backlog
716            // to be saturated
717            ExecutorService pumperExecutor =
718                    Executors.newSingleThreadExecutor(
719                    new ThreadFactory() {
720
721                        @Override
722                        public Thread newThread(Runnable r) {
723                            Thread t = new Thread(r);
724                            t.setDaemon(true);
725                            t.setName("Pumper");
726                            return t;
727                        }
728                    });
729
730            pumpDone = false;
731            try {
732                Future<Integer> pumpFuture = pumpRefuser(pumperExecutor);
733                waitPump("\nWait for initial Pump");
734
735                test(socketChannelFactory, CONNECT, false);
736                test(socketChannelFactory, FINISH_CONNECT, false);
737
738                pumpDone = true;
739                Integer newConn = pumpFuture.get(30, TimeUnit.SECONDS);
740                log.println("Pump " + newConn + " connections.");
741            } finally {
742                pumperExecutor.shutdown();
743            }
744        }
745
746        test(serverSocketChannelFactory, ACCEPT);
747        test(datagramChannelFactory);
748        test(pipeSourceChannelFactory);
749        test(pipeSinkChannelFactory);
750    }
751}
752