1/*
2 * Copyright 2009 Google Inc.  All Rights Reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24import java.net.InetSocketAddress;
25import java.net.SocketAddress;
26import java.nio.channels.SelectionKey;
27import java.nio.channels.Selector;
28import java.nio.channels.ServerSocketChannel;
29import java.nio.channels.SocketChannel;
30import java.util.ArrayList;
31import java.util.Iterator;
32import java.util.List;
33
34/**
35 * Reproduces O(N^2) behavior of JDK6/7 select() call. This happens when
36 * a selector has many unprocessed updates to its interest set (e.g. adding
37 * OP_READ on a bunch of newly accepted sockets). The O(N^2) is triggered
38 * by cancelling a number of selection keys (or just closing a few sockets).
39 * In this case, select() will first go through the list of cancelled keys
40 * and try to deregister them. That deregistration is O(N^2) over the list
41 * of unprocessed updates to the interest set.
42 *
43 * <p> This O(N^2) behavior is a BUG in JVM and should be fixed.
44 *
45 * <p> The test first creates initCount connections, and adds them
46 * to the server epoll set. It then creates massCount connections,
47 * registers interest (causing updateList to be populated with massCount*2
48 * elements), but does not add them to epoll set (that would've cleared
49 * updateList). The test then closes initCount connections, thus populating
50 * deregistration queue. The subsequent call to selectNow() will first process
51 * deregistration queue, performing O(N^2) over updateList size,
52 * equal to massCount * 2.
53 *
54 * <p> Note that connect rate is artificially slowed down to compensate
55 * for what I believe is a Linux bug, where too high of a connection rate
56 * ends up in SYN's being dropped and then slow retransmits.
57 *
58 * @author Igor Chernyshev
59 */
60public class LotsOfCancels {
61
62    static long testStartTime;
63
64    public static void main(String[] args) throws Exception {
65        // the final select should run in less than 1000ms.
66        runTest(500, 2700, 1000);
67    }
68
69    static void log(String msg) {
70        System.out.println(getLogPrefix() + msg);
71    }
72
73    static String getLogPrefix() {
74        return durationMillis(testStartTime) + ": ";
75    }
76
77    /**
78     * Returns the elapsed time since startNanos, in milliseconds.
79     * @param startNanos the start time; this must be a value returned
80     * by {@link System.nanoTime}
81     */
82    static long durationMillis(long startNanos) {
83        return (System.nanoTime() - startNanos) / (1000L * 1000L);
84    }
85
86    static void runTest(int initCount, int massCount, int maxSelectTime)
87            throws Exception {
88        testStartTime = System.nanoTime();
89
90        InetSocketAddress address = new InetSocketAddress("127.0.0.1", 7359);
91
92        // Create server channel, add it to selector and run epoll_ctl.
93        log("Setting up server");
94        Selector serverSelector = Selector.open();
95        ServerSocketChannel server = ServerSocketChannel.open();
96        server.configureBlocking(false);
97        server.socket().bind(address, 5000);
98        server.register(serverSelector, SelectionKey.OP_ACCEPT);
99        serverSelector.selectNow();
100
101        log("Setting up client");
102        ClientThread client = new ClientThread(address);
103        client.start();
104        Thread.sleep(100);
105
106        // Set up initial set of client sockets.
107        log("Starting initial client connections");
108        client.connectClients(initCount);
109        Thread.sleep(500);  // Wait for client connections to arrive
110
111        // Accept all initial client sockets, add to selector and run
112        // epoll_ctl.
113        log("Accepting initial connections");
114        List<SocketChannel> serverChannels1 =
115            acceptAndAddAll(serverSelector, server, initCount);
116        if (serverChannels1.size() != initCount) {
117            throw new Exception("Accepted " + serverChannels1.size() +
118                                " instead of " + initCount);
119        }
120        serverSelector.selectNow();
121
122        // Set up mass set of client sockets.
123        log("Requesting mass client connections");
124        client.connectClients(massCount);
125        Thread.sleep(500);  // Wait for client connections to arrive
126
127        // Accept all mass client sockets, add to selector and do NOT
128        // run epoll_ctl.
129        log("Accepting mass connections");
130        List<SocketChannel> serverChannels2 =
131            acceptAndAddAll(serverSelector, server, massCount);
132        if (serverChannels2.size() != massCount) {
133            throw new Exception("Accepted " + serverChannels2.size() +
134                                " instead of " + massCount);
135        }
136
137        // Close initial set of sockets.
138        log("Closing initial connections");
139        closeAll(serverChannels1);
140
141        // Now get the timing of select() call.
142        log("Running the final select call");
143        long startTime = System.nanoTime();
144        serverSelector.selectNow();
145        long duration = durationMillis(startTime);
146        log("Init count = " + initCount +
147            ", mass count = " + massCount +
148            ", duration = " + duration + "ms");
149
150        if (duration > maxSelectTime) {
151            System.out.println
152                ("\n\n\n\n\nFAILURE: The final selectNow() took " +
153                 duration + "ms " +
154                 "- seems like O(N^2) bug is still here\n\n");
155            System.exit(1);
156        }
157    }
158
159    static List<SocketChannel> acceptAndAddAll(Selector selector,
160                                               ServerSocketChannel server,
161                                               int expected)
162            throws Exception {
163        int retryCount = 0;
164        int acceptCount = 0;
165        List<SocketChannel> channels = new ArrayList<SocketChannel>();
166        while (channels.size() < expected) {
167            SocketChannel channel = server.accept();
168            if (channel == null) {
169                log("accept() returned null " +
170                    "after accepting " + acceptCount + " more connections");
171                acceptCount = 0;
172                if (retryCount < 10) {
173                    // See if more new sockets got stacked behind.
174                    retryCount++;
175                    Thread.sleep(500);
176                    continue;
177                }
178                break;
179            }
180            retryCount = 0;
181            acceptCount++;
182            channel.configureBlocking(false);
183            channel.register(selector, SelectionKey.OP_READ);
184            channels.add(channel);
185        }
186        // Cause an additional updateList entry per channel.
187        for (SocketChannel channel : channels) {
188            channel.register(selector, SelectionKey.OP_WRITE);
189        }
190        return channels;
191    }
192
193    static void closeAll(List<SocketChannel> channels)
194            throws Exception {
195        for (SocketChannel channel : channels) {
196            channel.close();
197        }
198    }
199
200    static class ClientThread extends Thread {
201        private final SocketAddress address;
202        private final Selector selector;
203        private int connectionsNeeded;
204        private int totalCreated;
205
206        ClientThread(SocketAddress address) throws Exception {
207            this.address = address;
208            selector = Selector.open();
209            setDaemon(true);
210        }
211
212        void connectClients(int count) throws Exception {
213            synchronized (this) {
214                connectionsNeeded += count;
215            }
216            selector.wakeup();
217        }
218
219        @Override
220        public void run() {
221            try {
222                handleClients();
223            } catch (Throwable e) {
224                e.printStackTrace();
225                System.exit(1);
226            }
227        }
228
229        private void handleClients() throws Exception {
230            int selectCount = 0;
231            while (true) {
232                int createdCount = 0;
233                synchronized (this) {
234                    if (connectionsNeeded > 0) {
235
236                        while (connectionsNeeded > 0 && createdCount < 20) {
237                            connectionsNeeded--;
238                            createdCount++;
239                            totalCreated++;
240
241                            SocketChannel channel = SocketChannel.open();
242                            channel.configureBlocking(false);
243                            channel.connect(address);
244                            if (!channel.finishConnect()) {
245                                channel.register(selector,
246                                                 SelectionKey.OP_CONNECT);
247                            }
248                        }
249
250                        log("Started total of " +
251                            totalCreated + " client connections");
252                        Thread.sleep(200);
253                    }
254                }
255
256                if (createdCount > 0) {
257                    selector.selectNow();
258                } else {
259                    selectCount++;
260                    long startTime = System.nanoTime();
261                    selector.select();
262                    long duration = durationMillis(startTime);
263                    log("Exited clientSelector.select(), loop #"
264                        + selectCount + ", duration = " + duration + "ms");
265                }
266
267                int keyCount = -1;
268                Iterator<SelectionKey> keys =
269                    selector.selectedKeys().iterator();
270                while (keys.hasNext()) {
271                    SelectionKey key = keys.next();
272                    synchronized (key) {
273                        keyCount++;
274                        keys.remove();
275                        if (!key.isValid()) {
276                            log("Ignoring client key #" + keyCount);
277                            continue;
278                        }
279                        int readyOps = key.readyOps();
280                        if (readyOps == SelectionKey.OP_CONNECT) {
281                            key.interestOps(0);
282                            ((SocketChannel) key.channel()).finishConnect();
283                        } else {
284                            log("readyOps() on client key #" + keyCount +
285                                " returned " + readyOps);
286                        }
287                    }
288                }
289            }
290        }
291    }
292}
293