Basic.java revision 11822:110f7f35760f
1189251Ssam/*
2189251Ssam * Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
3214734Srpaulo * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4189251Ssam *
5252726Srpaulo * This code is free software; you can redistribute it and/or modify it
6252726Srpaulo * under the terms of the GNU General Public License version 2 only, as
7189251Ssam * published by the Free Software Foundation.
8189251Ssam *
9189251Ssam * This code is distributed in the hope that it will be useful, but WITHOUT
10189251Ssam * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11189251Ssam * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12189251Ssam * version 2 for more details (a copy is included in the LICENSE file that
13189251Ssam * accompanied this code).
14189251Ssam *
15189251Ssam * You should have received a copy of the GNU General Public License version
16189251Ssam * 2 along with this work; if not, write to the Free Software Foundation,
17189251Ssam * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18189251Ssam *
19189251Ssam * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20189251Ssam * or visit www.oracle.com if you need additional information or have any
21189251Ssam * questions.
22189251Ssam */
23189251Ssam
24189251Ssam/* @test
25189251Ssam * @bug 4607272 6842687 6878369 6944810 7023403
26189251Ssam * @summary Unit test for AsynchronousSocketChannel
27189251Ssam * @run main Basic -skipSlowConnectTest
28189251Ssam * @key randomness
29189251Ssam */
30214734Srpaulo
31189251Ssamimport java.nio.ByteBuffer;
32214734Srpauloimport java.nio.channels.*;
33189251Ssamimport static java.net.StandardSocketOptions.*;
34189251Ssamimport java.net.*;
35189251Ssamimport java.util.Random;
36189251Ssamimport java.util.concurrent.*;
37189251Ssamimport java.util.concurrent.atomic.*;
38189251Ssamimport java.io.Closeable;
39189251Ssamimport java.io.IOException;
40214734Srpaulo
41189251Ssampublic class Basic {
42214734Srpaulo    static final Random rand = new Random();
43189251Ssam
44214734Srpaulo    static boolean skipSlowConnectTest = false;
45214734Srpaulo
46189251Ssam    public static void main(String[] args) throws Exception {
47189251Ssam        for (String arg: args) {
48189251Ssam            switch (arg) {
49189251Ssam            case "-skipSlowConnectTest" :
50189251Ssam                skipSlowConnectTest = true;
51214734Srpaulo                break;
52189251Ssam            default:
53214734Srpaulo                throw new RuntimeException("Unrecognized argument: " + arg);
54214734Srpaulo            }
55189251Ssam        }
56189251Ssam
57189251Ssam        testBind();
58189251Ssam        testSocketOptions();
59189251Ssam        testConnect();
60189251Ssam        testCloseWhenPending();
61189251Ssam        testCancel();
62189251Ssam        testRead1();
63189251Ssam        testRead2();
64189251Ssam        testRead3();
65189251Ssam        testWrite1();
66189251Ssam        testWrite2();
67189251Ssam        // skip timeout tests until 7052549 is fixed
68189251Ssam        if (!System.getProperty("os.name").startsWith("Windows"))
69189251Ssam            testTimeout();
70189251Ssam        testShutdown();
71189251Ssam    }
72189251Ssam
73189251Ssam    static class Server implements Closeable {
74189251Ssam        private final ServerSocketChannel ssc;
75189251Ssam        private final InetSocketAddress address;
76189251Ssam
77214734Srpaulo        Server() throws IOException {
78189251Ssam            ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0));
79214734Srpaulo
80214734Srpaulo            InetAddress lh = InetAddress.getLocalHost();
81189251Ssam            int port = ((InetSocketAddress)(ssc.getLocalAddress())).getPort();
82189251Ssam            address = new InetSocketAddress(lh, port);
83189251Ssam        }
84189251Ssam
85189251Ssam        InetSocketAddress address() {
86189251Ssam            return address;
87189251Ssam        }
88189251Ssam
89189251Ssam        SocketChannel accept() throws IOException {
90189251Ssam            return ssc.accept();
91189251Ssam        }
92189251Ssam
93189251Ssam        public void close() throws IOException {
94189251Ssam            ssc.close();
95189251Ssam        }
96189251Ssam
97189251Ssam    }
98189251Ssam
99189251Ssam    static void testBind() throws Exception {
100189251Ssam        System.out.println("-- bind --");
101189251Ssam
102189251Ssam        try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
103189251Ssam            if (ch.getLocalAddress() != null)
104189251Ssam                throw new RuntimeException("Local address should be 'null'");
105189251Ssam            ch.bind(new InetSocketAddress(0));
106189251Ssam
107189251Ssam            // check local address after binding
108189251Ssam            InetSocketAddress local = (InetSocketAddress)ch.getLocalAddress();
109189251Ssam            if (local.getPort() == 0)
110189251Ssam                throw new RuntimeException("Unexpected port");
111189251Ssam            if (!local.getAddress().isAnyLocalAddress())
112189251Ssam                throw new RuntimeException("Not bound to a wildcard address");
113189251Ssam
114189251Ssam            // try to re-bind
115189251Ssam            try {
116189251Ssam                ch.bind(new InetSocketAddress(0));
117189251Ssam                throw new RuntimeException("AlreadyBoundException expected");
118189251Ssam            } catch (AlreadyBoundException x) {
119189251Ssam            }
120189251Ssam        }
121189251Ssam
122189251Ssam        // check ClosedChannelException
123189251Ssam        AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
124189251Ssam        ch.close();
125189251Ssam        try {
126189251Ssam            ch.bind(new InetSocketAddress(0));
127189251Ssam            throw new RuntimeException("ClosedChannelException  expected");
128189251Ssam        } catch (ClosedChannelException  x) {
129189251Ssam        }
130189251Ssam    }
131189251Ssam
132189251Ssam    static void testSocketOptions() throws Exception {
133189251Ssam        System.out.println("-- socket options --");
134189251Ssam
135189251Ssam        try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
136189251Ssam            ch.setOption(SO_RCVBUF, 128*1024)
137252726Srpaulo              .setOption(SO_SNDBUF, 128*1024)
138252726Srpaulo              .setOption(SO_REUSEADDR, true);
139189251Ssam
140189251Ssam            // check SO_SNDBUF/SO_RCVBUF limits
141189251Ssam            int before, after;
142189251Ssam            before = ch.getOption(SO_SNDBUF);
143189251Ssam            after = ch.setOption(SO_SNDBUF, Integer.MAX_VALUE).getOption(SO_SNDBUF);
144189251Ssam            if (after < before)
145189251Ssam                throw new RuntimeException("setOption caused SO_SNDBUF to decrease");
146189251Ssam            before = ch.getOption(SO_RCVBUF);
147189251Ssam            after = ch.setOption(SO_RCVBUF, Integer.MAX_VALUE).getOption(SO_RCVBUF);
148189251Ssam            if (after < before)
149189251Ssam                throw new RuntimeException("setOption caused SO_RCVBUF to decrease");
150189251Ssam
151189251Ssam            ch.bind(new InetSocketAddress(0));
152189251Ssam
153189251Ssam            // default values
154189251Ssam            if (ch.getOption(SO_KEEPALIVE))
155189251Ssam                throw new RuntimeException("Default of SO_KEEPALIVE should be 'false'");
156189251Ssam            if (ch.getOption(TCP_NODELAY))
157189251Ssam                throw new RuntimeException("Default of TCP_NODELAY should be 'false'");
158189251Ssam
159189251Ssam            // set and check
160189251Ssam            if (!ch.setOption(SO_KEEPALIVE, true).getOption(SO_KEEPALIVE))
161189251Ssam                throw new RuntimeException("SO_KEEPALIVE did not change");
162189251Ssam            if (!ch.setOption(TCP_NODELAY, true).getOption(TCP_NODELAY))
163189251Ssam                throw new RuntimeException("SO_KEEPALIVE did not change");
164189251Ssam
165189251Ssam            // read others (can't check as actual value is implementation dependent)
166189251Ssam            ch.getOption(SO_RCVBUF);
167189251Ssam            ch.getOption(SO_SNDBUF);
168189251Ssam        }
169189251Ssam    }
170189251Ssam
171189251Ssam    static void testConnect() throws Exception {
172189251Ssam        System.out.println("-- connect --");
173189251Ssam
174189251Ssam        SocketAddress address;
175189251Ssam
176189251Ssam        try (Server server = new Server()) {
177189251Ssam            address = server.address();
178189251Ssam
179189251Ssam            // connect to server and check local/remote addresses
180189251Ssam            try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
181189251Ssam                ch.connect(address).get();
182189251Ssam                // check local address
183189251Ssam                if (ch.getLocalAddress() == null)
184189251Ssam                    throw new RuntimeException("Not bound to local address");
185189251Ssam
186189251Ssam                // check remote address
187189251Ssam                InetSocketAddress remote = (InetSocketAddress)ch.getRemoteAddress();
188189251Ssam                if (remote.getPort() != server.address().getPort())
189189251Ssam                    throw new RuntimeException("Connected to unexpected port");
190189251Ssam                if (!remote.getAddress().equals(server.address().getAddress()))
191189251Ssam                    throw new RuntimeException("Connected to unexpected address");
192189251Ssam
193189251Ssam                // try to connect again
194189251Ssam                try {
195189251Ssam                    ch.connect(server.address()).get();
196189251Ssam                    throw new RuntimeException("AlreadyConnectedException expected");
197189251Ssam                } catch (AlreadyConnectedException x) {
198189251Ssam                }
199189251Ssam
200189251Ssam                // clean-up
201189251Ssam                server.accept().close();
202189251Ssam            }
203189251Ssam
204189251Ssam            // check that connect fails with ClosedChannelException
205189251Ssam            AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
206189251Ssam            ch.close();
207189251Ssam            try {
208189251Ssam                ch.connect(server.address()).get();
209189251Ssam                throw new RuntimeException("ExecutionException expected");
210189251Ssam            } catch (ExecutionException x) {
211189251Ssam                if (!(x.getCause() instanceof ClosedChannelException))
212189251Ssam                    throw new RuntimeException("Cause of ClosedChannelException expected");
213189251Ssam            }
214189251Ssam            final AtomicReference<Throwable> connectException = new AtomicReference<>();
215189251Ssam            ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() {
216189251Ssam                public void completed(Void result, Void att) {
217189251Ssam                }
218189251Ssam                public void failed(Throwable exc, Void att) {
219189251Ssam                    connectException.set(exc);
220189251Ssam                }
221189251Ssam            });
222189251Ssam            while (connectException.get() == null) {
223189251Ssam                Thread.sleep(100);
224189251Ssam            }
225189251Ssam            if (!(connectException.get() instanceof ClosedChannelException))
226189251Ssam                throw new RuntimeException("ClosedChannelException expected");
227189251Ssam        }
228189251Ssam
229189251Ssam        // test that failure to connect closes the channel
230189251Ssam        try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
231189251Ssam            try {
232189251Ssam                ch.connect(address).get();
233189251Ssam            } catch (ExecutionException x) {
234189251Ssam                // failed to establish connection
235189251Ssam                if (ch.isOpen())
236189251Ssam                    throw new RuntimeException("Channel should be closed");
237189251Ssam            }
238189251Ssam        }
239189251Ssam
240189251Ssam        // repeat test by connecting to a (probably) non-existent host. This
241189251Ssam        // improves the chance that the connect will not fail immediately.
242189251Ssam        if (!skipSlowConnectTest) {
243189251Ssam            try (AsynchronousSocketChannel ch = AsynchronousSocketChannel.open()) {
244189251Ssam                try {
245189251Ssam                    ch.connect(genSocketAddress()).get();
246189251Ssam                } catch (ExecutionException x) {
247189251Ssam                    // failed to establish connection
248189251Ssam                    if (ch.isOpen())
249189251Ssam                        throw new RuntimeException("Channel should be closed");
250189251Ssam                }
251189251Ssam            }
252189251Ssam        }
253189251Ssam    }
254189251Ssam
255189251Ssam    static void testCloseWhenPending() throws Exception {
256189251Ssam        System.out.println("-- asynchronous close when connecting --");
257189251Ssam
258189251Ssam        AsynchronousSocketChannel ch;
259189251Ssam
260189251Ssam        // asynchronous close while connecting
261189251Ssam        ch = AsynchronousSocketChannel.open();
262189251Ssam        Future<Void> connectResult = ch.connect(genSocketAddress());
263189251Ssam
264189251Ssam        // give time to initiate the connect (SYN)
265189251Ssam        Thread.sleep(50);
266189251Ssam
267189251Ssam        // close
268189251Ssam        ch.close();
269189251Ssam
270189251Ssam        // check that exception is thrown in timely manner
271189251Ssam        try {
272189251Ssam            connectResult.get(5, TimeUnit.SECONDS);
273189251Ssam        } catch (TimeoutException x) {
274189251Ssam            throw new RuntimeException("AsynchronousCloseException not thrown");
275189251Ssam        } catch (ExecutionException x) {
276189251Ssam            // expected
277189251Ssam        }
278214734Srpaulo
279189251Ssam        System.out.println("-- asynchronous close when reading --");
280189251Ssam
281189251Ssam        try (Server server = new Server()) {
282189251Ssam            ch = AsynchronousSocketChannel.open();
283189251Ssam            ch.connect(server.address()).get();
284189251Ssam
285189251Ssam            ByteBuffer dst = ByteBuffer.allocateDirect(100);
286214734Srpaulo            Future<Integer> result = ch.read(dst);
287214734Srpaulo
288189251Ssam            // attempt a second read - should fail with ReadPendingException
289189251Ssam            ByteBuffer buf = ByteBuffer.allocateDirect(100);
290189251Ssam            try {
291189251Ssam                ch.read(buf);
292189251Ssam                throw new RuntimeException("ReadPendingException expected");
293189251Ssam            } catch (ReadPendingException x) {
294189251Ssam            }
295189251Ssam
296189251Ssam            // close channel (should cause initial read to complete)
297189251Ssam            ch.close();
298189251Ssam            server.accept().close();
299189251Ssam
300189251Ssam            // check that AsynchronousCloseException is thrown
301189251Ssam            try {
302189251Ssam                result.get();
303189251Ssam                throw new RuntimeException("Should not read");
304189251Ssam            } catch (ExecutionException x) {
305189251Ssam                if (!(x.getCause() instanceof AsynchronousCloseException))
306189251Ssam                    throw new RuntimeException(x);
307189251Ssam            }
308189251Ssam
309189251Ssam            System.out.println("-- asynchronous close when writing --");
310189251Ssam
311189251Ssam            ch = AsynchronousSocketChannel.open();
312189251Ssam            ch.connect(server.address()).get();
313189251Ssam
314189251Ssam            final AtomicReference<Throwable> writeException =
315189251Ssam                new AtomicReference<Throwable>();
316189251Ssam
317189251Ssam            // write bytes to fill socket buffer
318189251Ssam            ch.write(genBuffer(), ch, new CompletionHandler<Integer,AsynchronousSocketChannel>() {
319189251Ssam                public void completed(Integer result, AsynchronousSocketChannel ch) {
320189251Ssam                    ch.write(genBuffer(), ch, this);
321189251Ssam                }
322189251Ssam                public void failed(Throwable x, AsynchronousSocketChannel ch) {
323189251Ssam                    writeException.set(x);
324189251Ssam                }
325189251Ssam            });
326189251Ssam
327189251Ssam            // give time for socket buffer to fill up.
328189251Ssam            Thread.sleep(5*1000);
329189251Ssam
330189251Ssam            //  attempt a concurrent write - should fail with WritePendingException
331189251Ssam            try {
332189251Ssam                ch.write(genBuffer());
333189251Ssam                throw new RuntimeException("WritePendingException expected");
334189251Ssam            } catch (WritePendingException x) {
335189251Ssam            }
336189251Ssam
337189251Ssam            // close channel - should cause initial write to complete
338189251Ssam            ch.close();
339189251Ssam            server.accept().close();
340189251Ssam
341189251Ssam            // wait for exception
342189251Ssam            while (writeException.get() == null) {
343189251Ssam                Thread.sleep(100);
344189251Ssam            }
345189251Ssam            if (!(writeException.get() instanceof AsynchronousCloseException))
346189251Ssam                throw new RuntimeException("AsynchronousCloseException expected");
347189251Ssam        }
348189251Ssam    }
349189251Ssam
350189251Ssam    static void testCancel() throws Exception {
351189251Ssam        System.out.println("-- cancel --");
352189251Ssam
353189251Ssam        try (Server server = new Server()) {
354189251Ssam            for (int i=0; i<2; i++) {
355189251Ssam                boolean mayInterruptIfRunning = (i == 0) ? false : true;
356189251Ssam
357189251Ssam                // establish loopback connection
358189251Ssam                AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
359189251Ssam                ch.connect(server.address()).get();
360189251Ssam                SocketChannel peer = server.accept();
361189251Ssam
362189251Ssam                // start read operation
363189251Ssam                ByteBuffer buf = ByteBuffer.allocate(1);
364189251Ssam                Future<Integer> res = ch.read(buf);
365189251Ssam
366189251Ssam                // cancel operation
367189251Ssam                boolean cancelled = res.cancel(mayInterruptIfRunning);
368189251Ssam
369189251Ssam                // check post-conditions
370189251Ssam                if (!res.isDone())
371189251Ssam                    throw new RuntimeException("isDone should return true");
372189251Ssam                if (res.isCancelled() != cancelled)
373189251Ssam                    throw new RuntimeException("isCancelled not consistent");
374189251Ssam                try {
375189251Ssam                    res.get();
376189251Ssam                    throw new RuntimeException("CancellationException expected");
377189251Ssam                } catch (CancellationException x) {
378189251Ssam                }
379189251Ssam                try {
380189251Ssam                    res.get(1, TimeUnit.SECONDS);
381189251Ssam                    throw new RuntimeException("CancellationException expected");
382189251Ssam                } catch (CancellationException x) {
383189251Ssam                }
384189251Ssam
385189251Ssam                // check that the cancel doesn't impact writing to the channel
386189251Ssam                if (!mayInterruptIfRunning) {
387189251Ssam                    buf = ByteBuffer.wrap("a".getBytes());
388189251Ssam                    ch.write(buf).get();
389189251Ssam                }
390189251Ssam
391189251Ssam                ch.close();
392189251Ssam                peer.close();
393189251Ssam            }
394189251Ssam        }
395189251Ssam    }
396189251Ssam
397189251Ssam    static void testRead1() throws Exception {
398189251Ssam        System.out.println("-- read (1) --");
399189251Ssam
400189251Ssam        try (Server server = new Server()) {
401189251Ssam            final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
402189251Ssam            ch.connect(server.address()).get();
403189251Ssam
404189251Ssam            // read with 0 bytes remaining should complete immediately
405189251Ssam            ByteBuffer buf = ByteBuffer.allocate(1);
406189251Ssam            buf.put((byte)0);
407189251Ssam            int n = ch.read(buf).get();
408189251Ssam            if (n != 0)
409189251Ssam                throw new RuntimeException("0 expected");
410189251Ssam
411189251Ssam            // write bytes and close connection
412189251Ssam            ByteBuffer src = genBuffer();
413189251Ssam            try (SocketChannel sc = server.accept()) {
414189251Ssam                sc.setOption(SO_SNDBUF, src.remaining());
415189251Ssam                while (src.hasRemaining())
416189251Ssam                    sc.write(src);
417189251Ssam            }
418189251Ssam
419189251Ssam            // reads should complete immediately
420189251Ssam            final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
421189251Ssam            final CountDownLatch latch = new CountDownLatch(1);
422189251Ssam            ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
423189251Ssam                public void completed(Integer result, Void att) {
424189251Ssam                    int n = result;
425189251Ssam                    if (n > 0) {
426189251Ssam                        ch.read(dst, (Void)null, this);
427189251Ssam                    } else {
428189251Ssam                        latch.countDown();
429189251Ssam                    }
430189251Ssam                }
431189251Ssam                public void failed(Throwable exc, Void att) {
432189251Ssam                }
433214734Srpaulo            });
434214734Srpaulo
435214734Srpaulo            latch.await();
436214734Srpaulo
437214734Srpaulo            // check buffers
438214734Srpaulo            src.flip();
439214734Srpaulo            dst.flip();
440214734Srpaulo            if (!src.equals(dst)) {
441214734Srpaulo                throw new RuntimeException("Contents differ");
442214734Srpaulo            }
443214734Srpaulo
444214734Srpaulo            // close channel
445214734Srpaulo            ch.close();
446214734Srpaulo
447214734Srpaulo            // check read fails with ClosedChannelException
448214734Srpaulo            try {
449252726Srpaulo                ch.read(dst).get();
450252726Srpaulo                throw new RuntimeException("ExecutionException expected");
451252726Srpaulo            } catch (ExecutionException x) {
452252726Srpaulo                if (!(x.getCause() instanceof ClosedChannelException))
453252726Srpaulo                    throw new RuntimeException("Cause of ClosedChannelException expected");
454252726Srpaulo            }
455252726Srpaulo        }
456252726Srpaulo    }
457252726Srpaulo
458252726Srpaulo    static void testRead2() throws Exception {
459252726Srpaulo        System.out.println("-- read (2) --");
460189251Ssam
461        try (Server server = new Server()) {
462            final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
463            ch.connect(server.address()).get();
464            SocketChannel sc = server.accept();
465
466            ByteBuffer src = genBuffer();
467
468            // read until the buffer is full
469            final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity());
470            final CountDownLatch latch = new CountDownLatch(1);
471            ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() {
472                public void completed(Integer result, Void att) {
473                    if (dst.hasRemaining()) {
474                        ch.read(dst, (Void)null, this);
475                    } else {
476                        latch.countDown();
477                    }
478                }
479                public void failed(Throwable exc, Void att) {
480                }
481            });
482
483            // trickle the writing
484            do {
485                int rem = src.remaining();
486                int size = (rem <= 100) ? rem : 50 + rand.nextInt(rem - 100);
487                ByteBuffer buf = ByteBuffer.allocate(size);
488                for (int i=0; i<size; i++)
489                    buf.put(src.get());
490                buf.flip();
491                Thread.sleep(50 + rand.nextInt(1500));
492                while (buf.hasRemaining())
493                    sc.write(buf);
494            } while (src.hasRemaining());
495
496            // wait until ascynrhonous reading has completed
497            latch.await();
498
499            // check buffers
500            src.flip();
501            dst.flip();
502            if (!src.equals(dst)) {
503               throw new RuntimeException("Contents differ");
504            }
505
506            sc.close();
507            ch.close();
508        }
509    }
510
511    // exercise scattering read
512    static void testRead3() throws Exception {
513        System.out.println("-- read (3) --");
514
515        try (Server server = new Server()) {
516            final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
517            ch.connect(server.address()).get();
518            SocketChannel sc = server.accept();
519
520            ByteBuffer[] dsts = new ByteBuffer[3];
521            for (int i=0; i<dsts.length; i++) {
522                dsts[i] = ByteBuffer.allocateDirect(100);
523            }
524
525            // scattering read that completes ascynhronously
526            final CountDownLatch l1 = new CountDownLatch(1);
527            ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
528                new CompletionHandler<Long,Void>() {
529                    public void completed(Long result, Void att) {
530                        long n = result;
531                        if (n <= 0)
532                            throw new RuntimeException("No bytes read");
533                        l1.countDown();
534                    }
535                    public void failed(Throwable exc, Void att) {
536                    }
537            });
538
539            // write some bytes
540            sc.write(genBuffer());
541
542            // read should now complete
543            l1.await();
544
545            // write more bytes
546            sc.write(genBuffer());
547
548            // read should complete immediately
549            for (int i=0; i<dsts.length; i++) {
550                dsts[i].rewind();
551            }
552
553            final CountDownLatch l2 = new CountDownLatch(1);
554            ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null,
555                new CompletionHandler<Long,Void>() {
556                    public void completed(Long result, Void att) {
557                        long n = result;
558                        if (n <= 0)
559                            throw new RuntimeException("No bytes read");
560                        l2.countDown();
561                    }
562                    public void failed(Throwable exc, Void att) {
563                    }
564            });
565            l2.await();
566
567            ch.close();
568            sc.close();
569        }
570    }
571
572    static void testWrite1() throws Exception {
573        System.out.println("-- write (1) --");
574
575        try (Server server = new Server()) {
576            final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
577            ch.connect(server.address()).get();
578            SocketChannel sc = server.accept();
579
580            // write with 0 bytes remaining should complete immediately
581            ByteBuffer buf = ByteBuffer.allocate(1);
582            buf.put((byte)0);
583            int n = ch.write(buf).get();
584            if (n != 0)
585                throw new RuntimeException("0 expected");
586
587            // write all bytes and close connection when done
588            final ByteBuffer src = genBuffer();
589            ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() {
590                public void completed(Integer result, Void att) {
591                    if (src.hasRemaining()) {
592                        ch.write(src, (Void)null, this);
593                    } else {
594                        try {
595                            ch.close();
596                        } catch (IOException ignore) { }
597                    }
598                }
599                public void failed(Throwable exc, Void att) {
600                }
601            });
602
603            // read to EOF or buffer full
604            ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100);
605            do {
606                n = sc.read(dst);
607            } while (n > 0);
608            sc.close();
609
610            // check buffers
611            src.flip();
612            dst.flip();
613            if (!src.equals(dst)) {
614                throw new RuntimeException("Contents differ");
615            }
616
617            // check write fails with ClosedChannelException
618            try {
619                ch.read(dst).get();
620                throw new RuntimeException("ExecutionException expected");
621            } catch (ExecutionException x) {
622                if (!(x.getCause() instanceof ClosedChannelException))
623                    throw new RuntimeException("Cause of ClosedChannelException expected");
624            }
625        }
626    }
627
628    // exercise gathering write
629    static void testWrite2() throws Exception {
630        System.out.println("-- write (2) --");
631
632        try (Server server = new Server()) {
633            final AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
634            ch.connect(server.address()).get();
635            SocketChannel sc = server.accept();
636
637            // number of bytes written
638            final AtomicLong bytesWritten = new AtomicLong(0);
639
640            // write buffers (should complete immediately)
641            ByteBuffer[] srcs = genBuffers(1);
642            final CountDownLatch l1 = new CountDownLatch(1);
643            ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
644                new CompletionHandler<Long,Void>() {
645                    public void completed(Long result, Void att) {
646                        long n = result;
647                        if (n <= 0)
648                            throw new RuntimeException("No bytes read");
649                        bytesWritten.addAndGet(n);
650                        l1.countDown();
651                    }
652                    public void failed(Throwable exc, Void att) {
653                    }
654            });
655            l1.await();
656
657            // set to true to signal that no more buffers should be written
658            final AtomicBoolean continueWriting = new AtomicBoolean(true);
659
660            // write until socket buffer is full so as to create the conditions
661            // for when a write does not complete immediately
662            srcs = genBuffers(1);
663            ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null,
664                new CompletionHandler<Long,Void>() {
665                    public void completed(Long result, Void att) {
666                        long n = result;
667                        if (n <= 0)
668                            throw new RuntimeException("No bytes written");
669                        bytesWritten.addAndGet(n);
670                        if (continueWriting.get()) {
671                            ByteBuffer[] srcs = genBuffers(8);
672                            ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS,
673                                (Void)null, this);
674                        }
675                    }
676                    public void failed(Throwable exc, Void att) {
677                    }
678            });
679
680            // give time for socket buffer to fill up.
681            Thread.sleep(5*1000);
682
683            // signal handler to stop further writing
684            continueWriting.set(false);
685
686            // read until done
687            ByteBuffer buf = ByteBuffer.allocateDirect(4096);
688            long total = 0L;
689            do {
690                int n = sc.read(buf);
691                if (n <= 0)
692                    throw new RuntimeException("No bytes read");
693                buf.rewind();
694                total += n;
695            } while (total < bytesWritten.get());
696
697            ch.close();
698            sc.close();
699        }
700    }
701
702    static void testShutdown() throws Exception {
703        System.out.println("-- shutdown--");
704
705        try (Server server = new Server();
706             AsynchronousSocketChannel ch = AsynchronousSocketChannel.open())
707        {
708            ch.connect(server.address()).get();
709            try (SocketChannel peer = server.accept()) {
710                ByteBuffer buf = ByteBuffer.allocateDirect(1000);
711                int n;
712
713                // check read
714                ch.shutdownInput();
715                n = ch.read(buf).get();
716                if (n != -1)
717                    throw new RuntimeException("-1 expected");
718                // check full with full buffer
719                buf.put(new byte[100]);
720                n = ch.read(buf).get();
721                if (n != -1)
722                    throw new RuntimeException("-1 expected");
723
724                // check write
725                ch.shutdownOutput();
726                try {
727                    ch.write(buf).get();
728                    throw new RuntimeException("ClosedChannelException expected");
729                } catch (ExecutionException x) {
730                    if (!(x.getCause() instanceof ClosedChannelException))
731                        throw new RuntimeException("ClosedChannelException expected");
732                }
733            }
734        }
735    }
736
737    static void testTimeout() throws Exception {
738        System.out.println("-- timeouts --");
739        testTimeout(Integer.MIN_VALUE, TimeUnit.SECONDS);
740        testTimeout(-1L, TimeUnit.SECONDS);
741        testTimeout(0L, TimeUnit.SECONDS);
742        testTimeout(2L, TimeUnit.SECONDS);
743    }
744
745    static void testTimeout(final long timeout, final TimeUnit unit) throws Exception {
746        try (Server server = new Server()) {
747            AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
748            ch.connect(server.address()).get();
749
750            ByteBuffer dst = ByteBuffer.allocate(512);
751
752            final AtomicReference<Throwable> readException = new AtomicReference<Throwable>();
753
754            // this read should timeout if value is > 0
755            ch.read(dst, timeout, unit, null, new CompletionHandler<Integer,Void>() {
756                public void completed(Integer result, Void att) {
757                    readException.set(new RuntimeException("Should not complete"));
758                }
759                public void failed(Throwable exc, Void att) {
760                    readException.set(exc);
761                }
762            });
763            if (timeout > 0L) {
764                // wait for exception
765                while (readException.get() == null) {
766                    Thread.sleep(100);
767                }
768                if (!(readException.get() instanceof InterruptedByTimeoutException))
769                    throw new RuntimeException("InterruptedByTimeoutException expected");
770
771                // after a timeout then further reading should throw unspecified runtime exception
772                boolean exceptionThrown = false;
773                try {
774                    ch.read(dst);
775                } catch (RuntimeException x) {
776                    exceptionThrown = true;
777                }
778                if (!exceptionThrown)
779                    throw new RuntimeException("RuntimeException expected after timeout.");
780            } else {
781                Thread.sleep(1000);
782                Throwable exc = readException.get();
783                if (exc != null)
784                    throw new RuntimeException(exc);
785            }
786
787            final AtomicReference<Throwable> writeException = new AtomicReference<Throwable>();
788
789            // write bytes to fill socket buffer
790            ch.write(genBuffer(), timeout, unit, ch,
791                new CompletionHandler<Integer,AsynchronousSocketChannel>()
792            {
793                public void completed(Integer result, AsynchronousSocketChannel ch) {
794                    ch.write(genBuffer(), timeout, unit, ch, this);
795                }
796                public void failed(Throwable exc, AsynchronousSocketChannel ch) {
797                    writeException.set(exc);
798                }
799            });
800            if (timeout > 0) {
801                // wait for exception
802                while (writeException.get() == null) {
803                    Thread.sleep(100);
804                }
805                if (!(writeException.get() instanceof InterruptedByTimeoutException))
806                    throw new RuntimeException("InterruptedByTimeoutException expected");
807
808                // after a timeout then further writing should throw unspecified runtime exception
809                boolean exceptionThrown = false;
810                try {
811                    ch.write(genBuffer());
812                } catch (RuntimeException x) {
813                    exceptionThrown = true;
814                }
815                if (!exceptionThrown)
816                    throw new RuntimeException("RuntimeException expected after timeout.");
817            } else {
818                Thread.sleep(1000);
819                Throwable exc = writeException.get();
820                if (exc != null)
821                    throw new RuntimeException(exc);
822            }
823
824            // clean-up
825            server.accept().close();
826            ch.close();
827        }
828    }
829
830    // returns ByteBuffer with random bytes
831    static ByteBuffer genBuffer() {
832        int size = 1024 + rand.nextInt(16000);
833        byte[] buf = new byte[size];
834        rand.nextBytes(buf);
835        boolean useDirect = rand.nextBoolean();
836        if (useDirect) {
837            ByteBuffer bb = ByteBuffer.allocateDirect(buf.length);
838            bb.put(buf);
839            bb.flip();
840            return bb;
841        } else {
842            return ByteBuffer.wrap(buf);
843        }
844    }
845
846    // return ByteBuffer[] with random bytes
847    static ByteBuffer[] genBuffers(int max) {
848        int len = 1;
849        if (max > 1)
850            len += rand.nextInt(max);
851        ByteBuffer[] bufs = new ByteBuffer[len];
852        for (int i=0; i<len; i++)
853            bufs[i] = genBuffer();
854        return bufs;
855    }
856
857    // return random SocketAddress
858    static SocketAddress genSocketAddress() {
859        StringBuilder sb = new StringBuilder("10.");
860        sb.append(rand.nextInt(256));
861        sb.append('.');
862        sb.append(rand.nextInt(256));
863        sb.append('.');
864        sb.append(rand.nextInt(256));
865        InetAddress rh;
866        try {
867            rh = InetAddress.getByName(sb.toString());
868        } catch (UnknownHostException x) {
869            throw new InternalError("Should not happen");
870        }
871        return new InetSocketAddress(rh, rand.nextInt(65535)+1);
872    }
873}
874