1/*
2 * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24/* @test
25 * @bug 4927640
26 * @summary Tests the SCTP protocol implementation
27 * @author chegar
28 */
29
30import java.net.InetSocketAddress;
31import java.net.SocketAddress;
32import java.io.IOException;
33import java.util.concurrent.CountDownLatch;
34import java.util.concurrent.TimeUnit;
35import java.nio.ByteBuffer;
36import java.nio.channels.NotYetConnectedException;
37import java.nio.channels.ClosedChannelException;
38import com.sun.nio.sctp.AbstractNotificationHandler;
39import com.sun.nio.sctp.Association;
40import com.sun.nio.sctp.AssociationChangeNotification;
41import com.sun.nio.sctp.AssociationChangeNotification.AssocChangeEvent;
42import com.sun.nio.sctp.HandlerResult;
43import com.sun.nio.sctp.InvalidStreamException;
44import com.sun.nio.sctp.MessageInfo;
45import com.sun.nio.sctp.Notification;
46import com.sun.nio.sctp.SctpChannel;
47import com.sun.nio.sctp.SctpServerChannel;
48import static java.lang.System.out;
49import static java.lang.System.err;
50
51public class Send {
52    /* Latches used to synchronize between the client and server so that
53     * connections without any IO may not be closed without being accepted */
54    final CountDownLatch clientFinishedLatch = new CountDownLatch(1);
55    final CountDownLatch serverFinishedLatch = new CountDownLatch(1);
56
57    SendNotificationHandler handler = new SendNotificationHandler();
58
59    void test(String[] args) {
60        SocketAddress address = null;
61        Server server = null;
62
63        if (!Util.isSCTPSupported()) {
64            out.println("SCTP protocol is not supported");
65            out.println("Test cannot be run");
66            return;
67        }
68
69        if (args.length == 2) {
70            /* requested to connecct to a specific address */
71            try {
72                int port = Integer.valueOf(args[1]);
73                address = new InetSocketAddress(args[0], port);
74            } catch (NumberFormatException nfe) {
75                err.println(nfe);
76            }
77        } else {
78            /* start server on local machine, default */
79            try {
80                server = new Server();
81                server.start();
82                address = server.address();
83                debug("Server started and listening on " + address);
84            } catch (IOException ioe) {
85                ioe.printStackTrace();
86                return;
87            }
88        }
89
90        doTest(address);
91    }
92
93    void doTest(SocketAddress peerAddress) {
94        SctpChannel channel = null;
95        ByteBuffer buffer = ByteBuffer.allocate(Util.LARGE_BUFFER);
96        MessageInfo info = MessageInfo.createOutgoing(null, 0);
97
98        try {
99            channel = SctpChannel.open();
100
101            /* TEST 1: Verify NotYetConnectedException thrown */
102            try {
103                channel.send(buffer, info);
104                fail("should have thrown NotYetConnectedException");
105            } catch (NotYetConnectedException unused) {
106                pass();
107            }  catch (IOException ioe) {
108                unexpected(ioe);
109            }
110
111            channel.connect(peerAddress);
112            /* Receive CommUp */
113            channel.receive(buffer, null, handler);
114
115            /* TEST 2: send small message */
116            int streamNumber = 0;
117            debug("sending on stream number: " + streamNumber);
118            info = MessageInfo.createOutgoing(null, streamNumber);
119            buffer.put(Util.SMALL_MESSAGE.getBytes("ISO-8859-1"));
120            buffer.flip();
121            int position = buffer.position();
122            int remaining = buffer.remaining();
123
124            debug("sending small message: " + buffer);
125            int sent = channel.send(buffer, info);
126
127            check(sent == remaining, "sent should be equal to remaining");
128            check(buffer.position() == (position + sent),
129                    "buffers position should have been incremented by sent");
130
131            buffer.clear();
132
133            /* TEST 3: send large message */
134            streamNumber = handler.maxOutStreams() - 1;
135            debug("sending on stream number: " + streamNumber);
136            info = MessageInfo.createOutgoing(null, streamNumber);
137            buffer.put(Util.LARGE_MESSAGE.getBytes("ISO-8859-1"));
138            buffer.flip();
139            position = buffer.position();
140            remaining = buffer.remaining();
141
142            debug("sending large message: " + buffer);
143            sent = channel.send(buffer, info);
144
145            check(sent == remaining, "sent should be equal to remaining");
146            check(buffer.position() == (position + sent),
147                    "buffers position should have been incremented by sent");
148
149            /* TEST 4: InvalidStreamExcepton */
150            streamNumber = handler.maxInStreams;
151            info = MessageInfo.createOutgoing(null, streamNumber);
152            buffer.clear();
153            buffer.put(Util.SMALL_MESSAGE.getBytes("ISO-8859-1"));
154            buffer.flip();
155            position = buffer.position();
156            remaining = buffer.remaining();
157
158            debug("sending on stream number: " + streamNumber);
159            debug("sending small message: " + buffer);
160            try {
161                sent = channel.send(buffer, info);
162                fail("should have thrown InvalidStreamExcepton");
163            } catch (InvalidStreamException ise){
164                pass();
165            } catch (IOException ioe) {
166                unexpected(ioe);
167            }
168            check(buffer.remaining() == remaining,
169                    "remaining should not be changed");
170            check(buffer.position() == position,
171                    "buffers position should not be changed");
172
173            /* TEST 5: Non blocking send should return zero if there is
174               insufficient room in the underlying output buffer */
175            buffer.clear();
176            channel.configureBlocking(false);
177            info = MessageInfo.createOutgoing(null, 1);
178            buffer.put(Util.LARGE_MESSAGE.getBytes("ISO-8859-1"));
179            buffer.flip();
180
181            int count = 0;  // do not loop forever
182            do {
183                position = buffer.position();
184                remaining = buffer.remaining();
185                debug("sending large message: " + buffer);
186                sent = channel.send(buffer, info);
187                if (sent == 0) {
188                    check(buffer.remaining() == remaining,
189                          "remaining should not be changed");
190                    check(buffer.position() == position,
191                          "buffers position should not be changed");
192                }
193                buffer.rewind();
194            } while (sent != 0 && count++ < 100);
195
196            /* TEST 6: ClosedChannelException */
197            channel.close();
198            try {
199                channel.send(buffer, info);
200                fail("should have thrown ClosedChannelException");
201            } catch (ClosedChannelException cce) {
202               pass();
203            } catch (IOException ioe) {
204                unexpected(ioe);
205            }
206
207            /* TEST 7: send without previous receive.
208             * Verify that send can still throw InvalidStreamExcepton */
209            debug("Opening new channel.");
210            channel = SctpChannel.open(peerAddress, 0, 0);
211            streamNumber = Short.MAX_VALUE - 1;
212            info = MessageInfo.createOutgoing(null, streamNumber);
213            buffer.clear();
214            buffer.put(Util.SMALL_MESSAGE.getBytes("ISO-8859-1"));
215            buffer.flip();
216            position = buffer.position();
217            remaining = buffer.remaining();
218
219            debug("sending on stream number: " + streamNumber);
220            debug("sending small message: " + buffer);
221            try {
222                sent = channel.send(buffer, info);
223                fail("should have thrown InvalidStreamExcepton");
224            } catch (InvalidStreamException ise){
225                pass();
226            } catch (IOException ioe) {
227                unexpected(ioe);
228            }
229            check(buffer.remaining() == remaining,
230                    "remaining should not be changed");
231            check(buffer.position() == position,
232                    "buffers position should not be changed");
233
234            /* Receive CommUp */
235            channel.receive(buffer, null, handler);
236            check(handler.receivedCommUp(), "should have received COMM_UP");
237
238            /* TEST 8: Send to an invalid preferred SocketAddress */
239            SocketAddress addr = new InetSocketAddress("123.123.123.123", 3456);
240            info = MessageInfo.createOutgoing(addr, 0);
241            debug("sending to " + addr);
242            debug("sending small message: " + buffer);
243            try {
244                sent = channel.send(buffer, info);
245                fail("Invalid address should have thrown an Exception.");
246            } catch (Exception e){
247                pass();
248                debug("OK, caught " + e);
249            }
250
251            /* TEST 9: Send from heap buffer to force implementation to
252             * substitute with a native buffer, then check that its position
253             * is updated correctly */
254            buffer.clear();
255            info = MessageInfo.createOutgoing(null, 0);
256            buffer.put(Util.SMALL_MESSAGE.getBytes("ISO-8859-1"));
257            buffer.flip();
258            final int offset = 1;
259            buffer.position(offset);
260            remaining = buffer.remaining();
261
262            debug("sending small message: " + buffer);
263            try {
264                sent = channel.send(buffer, info);
265
266                check(sent == remaining, "sent should be equal to remaining");
267                check(buffer.position() == (offset + sent),
268                        "buffers position should have been incremented by sent");
269            } catch (IllegalArgumentException iae) {
270                fail(iae + ", Error updating buffers position");
271            }
272
273        } catch (IOException ioe) {
274            unexpected(ioe);
275        } finally {
276            clientFinishedLatch.countDown();
277            try { serverFinishedLatch.await(10L, TimeUnit.SECONDS); }
278            catch (InterruptedException ie) { unexpected(ie); }
279            if (channel != null) {
280                try { channel.close(); }
281                catch (IOException e) { unexpected (e);}
282            }
283        }
284    }
285
286    class Server implements Runnable
287    {
288        final InetSocketAddress serverAddr;
289        private SctpServerChannel ssc;
290
291        public Server() throws IOException {
292            ssc = SctpServerChannel.open().bind(null);
293            java.util.Set<SocketAddress> addrs = ssc.getAllLocalAddresses();
294            if (addrs.isEmpty())
295                debug("addrs should not be empty");
296
297            serverAddr = (InetSocketAddress) addrs.iterator().next();
298        }
299
300        public void start() {
301            (new Thread(this, "Server-"  + serverAddr.getPort())).start();
302        }
303
304        public InetSocketAddress address() {
305            return serverAddr;
306        }
307
308        @Override
309        public void run() {
310            ByteBuffer buffer = ByteBuffer.allocateDirect(Util.LARGE_BUFFER);
311            SctpChannel sc1 = null, sc2 = null;
312            try {
313                sc1 = ssc.accept();
314
315                /* receive a small message */
316                MessageInfo info;
317                do {
318                    info = sc1.receive(buffer, null, null);
319                    if (info == null) {
320                        fail("Server: unexpected null from receive");
321                            return;
322                    }
323                } while (!info.isComplete());
324
325                buffer.flip();
326                check(info != null, "info is null");
327                check(info.streamNumber() == 0,
328                        "message not sent on the correct stream");
329                check(info.bytes() == Util.SMALL_MESSAGE.getBytes("ISO-8859-1").
330                      length, "bytes received not equal to message length");
331                check(info.bytes() == buffer.remaining(), "bytes != remaining");
332                check(Util.compare(buffer, Util.SMALL_MESSAGE),
333                  "received message not the same as sent message");
334
335                /* receive a large message */
336                buffer.clear();
337                do {
338                    info = sc1.receive(buffer, null, null);
339                    if (info == null) {
340                        fail("Server: unexpected null from receive");
341                            return;
342                    }
343                } while (!info.isComplete());
344
345                buffer.flip();
346                check(info != null, "info is null");
347                check(info.streamNumber() == handler.maxOutStreams() - 1,
348                        "message not sent on the correct stream");
349                check(info.bytes() == Util.LARGE_MESSAGE.getBytes("ISO-8859-1").
350                      length, "bytes received not equal to message length");
351                check(info.bytes() == buffer.remaining(), "bytes != remaining");
352                check(Util.compare(buffer, Util.LARGE_MESSAGE),
353                  "received message not the same as sent message");
354
355                /* TEST 7 ++ */
356                sc2 = ssc.accept();
357
358                /* TEST 9 */
359                ByteBuffer expected = ByteBuffer.allocate(Util.SMALL_BUFFER);
360                expected.put(Util.SMALL_MESSAGE.getBytes("ISO-8859-1"));
361                expected.flip();
362                final int offset = 1;
363                expected.position(offset);
364                buffer.clear();
365                do {
366                    info = sc2.receive(buffer, null, null);
367                    if (info == null) {
368                        fail("Server: unexpected null from receive");
369                        return;
370                    }
371                } while (!info.isComplete());
372
373                buffer.flip();
374                check(info != null, "info is null");
375                check(info.streamNumber() == 0, "message not sent on the correct stream");
376                check(info.bytes() == expected.remaining(),
377                      "bytes received not equal to message length");
378                check(info.bytes() == buffer.remaining(), "bytes != remaining");
379                check(expected.equals(buffer),
380                    "received message not the same as sent message");
381
382                clientFinishedLatch.await(10L, TimeUnit.SECONDS);
383                serverFinishedLatch.countDown();
384            } catch (IOException ioe) {
385                unexpected(ioe);
386            } catch (InterruptedException ie) {
387                unexpected(ie);
388            } finally {
389                try { if (ssc != null) ssc.close(); }
390                catch (IOException  unused) {}
391                try { if (sc1 != null) sc1.close(); }
392                catch (IOException  unused) {}
393                try { if (sc2 != null) sc2.close(); }
394                catch (IOException  unused) {}
395            }
396        }
397    }
398
399    class SendNotificationHandler extends AbstractNotificationHandler<Void>
400    {
401        boolean receivedCommUp;  // false
402        int maxInStreams;
403        int maxOutStreams;
404
405        public boolean receivedCommUp() {
406            return receivedCommUp;
407        }
408
409        public int maxInStreams() {
410            return maxInStreams;
411        }
412
413        public int maxOutStreams(){
414            return maxOutStreams;
415        }
416
417        @Override
418        public HandlerResult handleNotification(
419                Notification notification, Void attachment) {
420            fail("Unknown notification type");
421            return HandlerResult.CONTINUE;
422        }
423
424        @Override
425        public HandlerResult handleNotification(
426                AssociationChangeNotification notification, Void attachment) {
427            AssocChangeEvent event = notification.event();
428            Association association = notification.association();
429            debug("AssociationChangeNotification");
430            debug("  Association: " + notification.association());
431            debug("  Event: " + event);
432
433            if (event.equals(AssocChangeEvent.COMM_UP))
434                receivedCommUp = true;
435
436            this.maxInStreams = association.maxInboundStreams();
437            this.maxOutStreams = association.maxOutboundStreams();
438
439            return HandlerResult.RETURN;
440        }
441    }
442
443        //--------------------- Infrastructure ---------------------------
444    boolean debug = true;
445    volatile int passed = 0, failed = 0;
446    void pass() {passed++;}
447    void fail() {failed++; Thread.dumpStack();}
448    void fail(String msg) {System.err.println(msg); fail();}
449    void unexpected(Throwable t) {failed++; t.printStackTrace();}
450    void check(boolean cond) {if (cond) pass(); else fail();}
451    void check(boolean cond, String failMessage) {if (cond) pass(); else fail(failMessage);}
452    void debug(String message) {if(debug) { System.out.println(message); }  }
453    public static void main(String[] args) throws Throwable {
454        Class<?> k = new Object(){}.getClass().getEnclosingClass();
455        try {k.getMethod("instanceMain",String[].class)
456                .invoke( k.newInstance(), (Object) args);}
457        catch (Throwable e) {throw e.getCause();}}
458    public void instanceMain(String[] args) throws Throwable {
459        try {test(args);} catch (Throwable t) {unexpected(t);}
460        System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
461        if (failed > 0) throw new AssertionError("Some tests failed");}
462
463}
464