Unbounded.java revision 893:f06f30b29f36
1/*
2 * Copyright 2008-2009 Sun Microsystems, Inc.  All Rights Reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
20 * CA 95054 USA or visit www.sun.com if you need additional information or
21 * have any questions.
22 */
23
24/* @test
25 * @bug 4607272
26 * @summary Unit test for AsynchronousChannelGroup
27 */
28
29import java.nio.ByteBuffer;
30import java.nio.channels.*;
31import java.net.*;
32import java.util.concurrent.*;
33import java.io.IOException;
34
35public class Unbounded {
36    // number of concurrent completion handlers
37    static final int CONCURRENCY_COUNT = 512;
38
39    public static void main(String[] args) throws Exception {
40        // all accepted connections are added to a queue
41        final ArrayBlockingQueue<AsynchronousSocketChannel> queue =
42            new ArrayBlockingQueue<AsynchronousSocketChannel>(CONCURRENCY_COUNT);
43
44        // create listener to accept connections
45        final AsynchronousServerSocketChannel listener =
46            AsynchronousServerSocketChannel.open()
47                .bind(new InetSocketAddress(0));
48        listener.accept(null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
49            public void completed(AsynchronousSocketChannel ch, Void att) {
50                queue.add(ch);
51                listener.accept(null, this);
52            }
53            public void failed(Throwable exc, Void att) {
54            }
55            public void cancelled(Void att) {
56            }
57        });
58        System.out.println("Listener created.");
59
60        // establish lots of connections
61        int port = ((InetSocketAddress)(listener.getLocalAddress())).getPort();
62        SocketAddress sa = new InetSocketAddress(InetAddress.getLocalHost(), port);
63        AsynchronousSocketChannel[] channels =
64            new AsynchronousSocketChannel[CONCURRENCY_COUNT];
65        for (int i=0; i<CONCURRENCY_COUNT; i++) {
66            int attempts = 0;
67            for (;;) {
68                try {
69                    channels[i] = AsynchronousSocketChannel.open();
70                    channels[i].connect(sa).get();
71                    break;
72                } catch (IOException x) {
73                    // probably resource issue so back off and retry
74                    if (++attempts >= 3)
75                        throw x;
76                    Thread.sleep(50);
77                }
78            }
79        }
80        System.out.println("All connection established.");
81
82        // the barrier where all threads (plus the main thread) wait
83        final CyclicBarrier barrier = new CyclicBarrier(CONCURRENCY_COUNT+1);
84
85        // initiate a read operation on each channel.
86        for (int i=0; i<CONCURRENCY_COUNT; i++) {
87            ByteBuffer buf = ByteBuffer.allocateDirect(100);
88            channels[i].read( buf, channels[i],
89                new CompletionHandler<Integer,AsynchronousSocketChannel>() {
90                    public void completed(Integer bytesRead, AsynchronousSocketChannel ch) {
91                        try {
92                            ch.close();
93                            barrier.await();
94                        } catch (Exception x) {
95                            throw new AssertionError(x);
96                        }
97                    }
98                    public void failed(Throwable exc, AsynchronousSocketChannel ch) {
99                    }
100                    public void cancelled(AsynchronousSocketChannel ch) {
101                    }
102                });
103        }
104        System.out.println("All read operations outstanding.");
105
106        // write data to each of the accepted connections
107        int remaining = CONCURRENCY_COUNT;
108        while (remaining > 0) {
109            AsynchronousSocketChannel ch = queue.take();
110            ch.write(ByteBuffer.wrap("welcome".getBytes())).get();
111            ch.close();
112            remaining--;
113        }
114
115        // wait for all threads to reach the barrier
116        System.out.println("Waiting for all threads to reach barrier");
117        barrier.await();
118        listener.close();
119    }
120}
121