1/*
2 * Copyright (c) 2009, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24/* @test
25 * @bug 6863110
26 * @summary Newly connected/accepted SctpChannel should fire OP_READ if registered with a Selector
27 * @author chegar
28 */
29
30import java.net.InetSocketAddress;
31import java.net.SocketAddress;
32import java.io.IOException;
33import java.util.Iterator;
34import java.util.Set;
35import java.util.concurrent.CountDownLatch;
36import java.nio.ByteBuffer;
37import java.nio.channels.Selector;
38import java.nio.channels.SelectionKey;
39import com.sun.nio.sctp.AbstractNotificationHandler;
40import com.sun.nio.sctp.AssociationChangeNotification;
41import com.sun.nio.sctp.AssociationChangeNotification.AssocChangeEvent;
42import com.sun.nio.sctp.HandlerResult;
43import com.sun.nio.sctp.Notification;
44import com.sun.nio.sctp.SctpChannel;
45import com.sun.nio.sctp.SctpServerChannel;
46import com.sun.nio.sctp.ShutdownNotification;
47import static java.lang.System.out;
48import static java.lang.System.err;
49import static java.nio.channels.SelectionKey.OP_CONNECT;
50import static java.nio.channels.SelectionKey.OP_READ;
51
52public class CommUp {
53    static CountDownLatch acceptLatch = new CountDownLatch(1);
54    static final int TIMEOUT = 10000;
55
56    CommUpNotificationHandler clientHandler = new CommUpNotificationHandler();
57    CommUpNotificationHandler serverHandler = new CommUpNotificationHandler();
58    CommUpServer server;
59    Thread clientThread;
60
61    void test(String[] args) {
62        SocketAddress address = null;
63
64        if (!Util.isSCTPSupported()) {
65            out.println("SCTP protocol is not supported");
66            out.println("Test cannot be run");
67            return;
68        }
69
70        if (args.length == 2) {
71            /* requested to connecct to a specific address */
72            try {
73                int port = Integer.valueOf(args[1]);
74                address = new InetSocketAddress(args[0], port);
75            } catch (NumberFormatException nfe) {
76                err.println(nfe);
77            }
78        } else {
79            /* start server on local machine, default */
80            try {
81                server = new CommUpServer();
82                server.start();
83                address = server.address();
84                debug("Server started and listening on " + address);
85            } catch (IOException ioe) {
86                ioe.printStackTrace();
87                return;
88            }
89        }
90
91        /* store the main thread so that the server can interrupt it, if necessary */
92        clientThread = Thread.currentThread();
93
94        doClient(address);
95    }
96
97    void doClient(SocketAddress peerAddress) {
98        SctpChannel sc = null;
99        try {
100            debug("connecting to " + peerAddress);
101            sc = SctpChannel.open();
102            sc.configureBlocking(false);
103            check(sc.isBlocking() == false, "Should be in non-blocking mode");
104            sc.connect(peerAddress);
105
106            Selector selector = Selector.open();
107            SelectionKey selectiontKey = sc.register(selector, OP_CONNECT);
108
109            /* Expect two interest Ops */
110            boolean opConnectReceived = false;
111            boolean opReadReceived = false;
112            for (int z=0; z<2; z++) {
113                debug("select " + z);
114                int keysAdded = selector.select(TIMEOUT);
115                debug("returned " + keysAdded + " keys");
116                if (keysAdded > 0) {
117                    Set<SelectionKey> keys = selector.selectedKeys();
118                    Iterator<SelectionKey> i = keys.iterator();
119                    while(i.hasNext()) {
120                        SelectionKey sk = i.next();
121                        i.remove();
122                        SctpChannel readyChannel =
123                            (SctpChannel)sk.channel();
124
125                        /* OP_CONNECT */
126                        if (sk.isConnectable()) {
127                            /* some trivial checks */
128                            check(opConnectReceived == false,
129                                  "should only received one OP_CONNECT");
130                            check(opReadReceived == false,
131                                  "should not receive OP_READ before OP_CONNECT");
132                            check(readyChannel.equals(sc),
133                                  "channels should be equal");
134                            check(!sk.isAcceptable(),
135                                  "key should not be acceptable");
136                            check(!sk.isReadable(),
137                                  "key should not be readable");
138                            check(!sk.isWritable(),
139                                  "key should not be writable");
140
141                            /* now process the OP_CONNECT */
142                            opConnectReceived = true;
143                            check((sk.interestOps() & OP_CONNECT) == OP_CONNECT,
144                                  "selection key interest ops should contain OP_CONNECT");
145                            sk.interestOps(OP_READ);
146                            check((sk.interestOps() & OP_CONNECT) != OP_CONNECT,
147                                  "selection key interest ops should not contain OP_CONNECT");
148                            check(sc.finishConnect(),
149                                  "finishConnect should return true");
150                        } /* OP_READ */
151                          else if (sk.isReadable()) {
152                            /* some trivial checks */
153                            check(opConnectReceived == true,
154                                  "should receive one OP_CONNECT before OP_READ");
155                            check(opReadReceived == false,
156                                  "should not receive OP_READ before OP_CONNECT");
157                            check(readyChannel.equals(sc),
158                                  "channels should be equal");
159                            check(!sk.isAcceptable(),
160                                  "key should not be acceptable");
161                            check(sk.isReadable(),
162                                  "key should be readable");
163                            check(!sk.isWritable(),
164                                  "key should not be writable");
165                            check(!sk.isConnectable(),
166                                  "key should not be connectable");
167
168                            /* now process the OP_READ */
169                            opReadReceived = true;
170                            selectiontKey.cancel();
171
172                            /* try with small buffer to see if native
173                             * implementation can handle this */
174                            ByteBuffer buffer = ByteBuffer.allocateDirect(1);
175                            readyChannel.receive(buffer, null, clientHandler);
176                            check(clientHandler.receivedCommUp(),
177                                    "Client should have received COMM_UP");
178
179                            /* dont close (or put anything on) the channel until
180                             * we check that the server's accepted channel also
181                             * received COMM_UP */
182                            serverHandler.waitForCommUp();
183                        } else {
184                            fail("Unexpected selection key");
185                        }
186                    }
187                } else {
188                    fail("Client selector returned 0 ready keys");
189                    /* stop the server */
190                    server.thread().interrupt();
191                }
192            } //for
193
194        } catch (IOException ioe) {
195            unexpected(ioe);
196        } catch (InterruptedException ie) {
197            unexpected(ie);
198        }
199    }
200
201    class CommUpServer implements Runnable
202    {
203        final InetSocketAddress serverAddr;
204        private SctpServerChannel ssc;
205        private Thread serverThread;
206
207        public CommUpServer() throws IOException {
208            ssc = SctpServerChannel.open().bind(null);
209            java.util.Set<SocketAddress> addrs = ssc.getAllLocalAddresses();
210            if (addrs.isEmpty())
211                debug("addrs should not be empty");
212
213            serverAddr = (InetSocketAddress) addrs.iterator().next();
214        }
215
216        void start() {
217            serverThread = new Thread(this, "CommUpServer-"  +
218                                              serverAddr.getPort());
219            serverThread.start();
220        }
221
222        InetSocketAddress address () {
223            return serverAddr;
224        }
225
226        Thread thread() {
227            return serverThread;
228        }
229
230        @Override
231        public void run() {
232            Selector selector = null;
233            SctpChannel sc = null;
234            SelectionKey readKey = null;
235            try {
236                sc = ssc.accept();
237                debug("accepted " + sc);
238
239                selector = Selector.open();
240                sc.configureBlocking(false);
241                check(sc.isBlocking() == false, "Should be in non-blocking mode");
242                readKey = sc.register(selector, SelectionKey.OP_READ);
243
244                debug("select");
245                int keysAdded = selector.select(TIMEOUT);
246                debug("returned " + keysAdded + " keys");
247                if (keysAdded > 0) {
248                    Set<SelectionKey> keys = selector.selectedKeys();
249                    Iterator<SelectionKey> i = keys.iterator();
250                    while(i.hasNext()) {
251                        SelectionKey sk = i.next();
252                        i.remove();
253                        SctpChannel readyChannel =
254                            (SctpChannel)sk.channel();
255                        check(readyChannel.equals(sc),
256                                "channels should be equal");
257                        check(!sk.isAcceptable(),
258                                "key should not be acceptable");
259                        check(sk.isReadable(),
260                                "key should be readable");
261                        check(!sk.isWritable(),
262                                "key should not be writable");
263                        check(!sk.isConnectable(),
264                                "key should not be connectable");
265
266                        /* block until we check if the client has received its COMM_UP*/
267                        clientHandler.waitForCommUp();
268
269                        ByteBuffer buffer = ByteBuffer.allocateDirect(1);
270                        sc.receive(buffer, null, serverHandler);
271                        check(serverHandler.receivedCommUp(),
272                                "Accepted channel should have received COMM_UP");
273                    }
274                } else {
275                   fail("Server selector returned 0 ready keys");
276                   /* stop the client */
277                   clientThread.interrupt();
278            }
279            } catch (IOException ioe) {
280                ioe.printStackTrace();
281            } catch (InterruptedException unused) {
282            } finally {
283                if (readKey != null) readKey.cancel();
284                try { if (selector != null) selector.close(); }
285                catch (IOException  ioe) { unexpected(ioe); }
286                try { if (ssc != null) ssc.close(); }
287                catch (IOException  ioe) { unexpected(ioe); }
288                try { if (sc != null) sc.close(); }
289                catch (IOException  ioe) { unexpected(ioe); }
290            }
291        }
292    }
293
294    class CommUpNotificationHandler extends AbstractNotificationHandler<Object>
295    {
296        private boolean receivedCommUp;  // false
297
298        public synchronized boolean receivedCommUp() {
299            return receivedCommUp;
300        }
301
302        public synchronized boolean waitForCommUp() throws InterruptedException {
303            while (receivedCommUp == false) {
304                wait();
305            }
306
307            return false;
308        }
309
310        @Override
311        public HandlerResult handleNotification(
312                Notification notification, Object attachment) {
313            fail("Unknown notification type");
314            return HandlerResult.CONTINUE;
315        }
316
317        @Override
318        public synchronized HandlerResult handleNotification(
319                AssociationChangeNotification notification, Object attachment) {
320            AssocChangeEvent event = notification.event();
321            debug("AssociationChangeNotification");
322            debug("  Association: " + notification.association());
323            debug("  Event: " + event);
324
325            if (event.equals(AssocChangeEvent.COMM_UP)) {
326                receivedCommUp = true;
327                notifyAll();
328            }
329
330            return HandlerResult.RETURN;
331        }
332
333        @Override
334        public HandlerResult handleNotification(
335                ShutdownNotification notification, Object attachment) {
336            debug("ShutdownNotification");
337            debug("  Association: " + notification.association());
338            return HandlerResult.RETURN;
339        }
340    }
341
342        //--------------------- Infrastructure ---------------------------
343    boolean debug = true;
344    volatile int passed = 0, failed = 0;
345    void pass() {passed++;}
346    void fail() {failed++; Thread.dumpStack();}
347    void fail(String msg) {err.println(msg); fail();}
348    void unexpected(Throwable t) {failed++; t.printStackTrace();}
349    void check(boolean cond) {if (cond) pass(); else fail();}
350    void check(boolean cond, String failMessage) {if (cond) pass(); else fail(failMessage);}
351    void debug(String message) {if(debug) { out.println(Thread.currentThread().getName() + ": " + message); }  }
352    void sleep(long millis) { try { Thread.currentThread().sleep(millis); }
353                          catch(InterruptedException ie) { unexpected(ie); }}
354    public static void main(String[] args) throws Throwable {
355        Class<?> k = new Object(){}.getClass().getEnclosingClass();
356        try {k.getMethod("instanceMain",String[].class)
357                .invoke( k.newInstance(), (Object) args);}
358        catch (Throwable e) {throw e.getCause();}}
359    public void instanceMain(String[] args) throws Throwable {
360        try {test(args);} catch (Throwable t) {unexpected(t);}
361        out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
362        if (failed > 0) throw new AssertionError("Some tests failed");}
363
364}
365