WindowsSelectorImpl.java revision 12745:f068a4ffddd2
1/*
2 * Copyright (c) 2002, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26/*
27 */
28
29
30package sun.nio.ch;
31
32import java.nio.channels.spi.SelectorProvider;
33import java.nio.channels.Selector;
34import java.nio.channels.ClosedSelectorException;
35import java.nio.channels.Pipe;
36import java.nio.channels.SelectableChannel;
37import java.io.IOException;
38import java.nio.channels.CancelledKeyException;
39import java.util.List;
40import java.util.ArrayList;
41import java.util.HashMap;
42import java.util.Iterator;
43import sun.misc.ManagedLocalsThread;
44
45/**
46 * A multi-threaded implementation of Selector for Windows.
47 *
48 * @author Konstantin Kladko
49 * @author Mark Reinhold
50 */
51
52final class WindowsSelectorImpl extends SelectorImpl {
53    // Initial capacity of the poll array
54    private final int INIT_CAP = 8;
55    // Maximum number of sockets for select().
56    // Should be INIT_CAP times a power of 2
57    private static final int MAX_SELECTABLE_FDS = 1024;
58
59    // The list of SelectableChannels serviced by this Selector. Every mod
60    // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
61    // array,  where the corresponding entry is occupied by the wakeupSocket
62    private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
63
64    // The global native poll array holds file decriptors and event masks
65    private PollArrayWrapper pollWrapper;
66
67    // The number of valid entries in  poll array, including entries occupied
68    // by wakeup socket handle.
69    private int totalChannels = 1;
70
71    // Number of helper threads needed for select. We need one thread per
72    // each additional set of MAX_SELECTABLE_FDS - 1 channels.
73    private int threadsCount = 0;
74
75    // A list of helper threads for select.
76    private final List<SelectThread> threads = new ArrayList<SelectThread>();
77
78    //Pipe used as a wakeup object.
79    private final Pipe wakeupPipe;
80
81    // File descriptors corresponding to source and sink
82    private final int wakeupSourceFd, wakeupSinkFd;
83
84    // Lock for close cleanup
85    private Object closeLock = new Object();
86
87    // Maps file descriptors to their indices in  pollArray
88    private static final class FdMap extends HashMap<Integer, MapEntry> {
89        static final long serialVersionUID = 0L;
90        private MapEntry get(int desc) {
91            return get(new Integer(desc));
92        }
93        private MapEntry put(SelectionKeyImpl ski) {
94            return put(new Integer(ski.channel.getFDVal()), new MapEntry(ski));
95        }
96        private MapEntry remove(SelectionKeyImpl ski) {
97            Integer fd = new Integer(ski.channel.getFDVal());
98            MapEntry x = get(fd);
99            if ((x != null) && (x.ski.channel == ski.channel))
100                return remove(fd);
101            return null;
102        }
103    }
104
105    // class for fdMap entries
106    private static final class MapEntry {
107        SelectionKeyImpl ski;
108        long updateCount = 0;
109        long clearedCount = 0;
110        MapEntry(SelectionKeyImpl ski) {
111            this.ski = ski;
112        }
113    }
114    private final FdMap fdMap = new FdMap();
115
116    // SubSelector for the main thread
117    private final SubSelector subSelector = new SubSelector();
118
119    private long timeout; //timeout for poll
120
121    // Lock for interrupt triggering and clearing
122    private final Object interruptLock = new Object();
123    private volatile boolean interruptTriggered = false;
124
125    WindowsSelectorImpl(SelectorProvider sp) throws IOException {
126        super(sp);
127        pollWrapper = new PollArrayWrapper(INIT_CAP);
128        wakeupPipe = Pipe.open();
129        wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
130
131        // Disable the Nagle algorithm so that the wakeup is more immediate
132        SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
133        (sink.sc).socket().setTcpNoDelay(true);
134        wakeupSinkFd = ((SelChImpl)sink).getFDVal();
135
136        pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
137    }
138
139    protected int doSelect(long timeout) throws IOException {
140        if (channelArray == null)
141            throw new ClosedSelectorException();
142        this.timeout = timeout; // set selector timeout
143        processDeregisterQueue();
144        if (interruptTriggered) {
145            resetWakeupSocket();
146            return 0;
147        }
148        // Calculate number of helper threads needed for poll. If necessary
149        // threads are created here and start waiting on startLock
150        adjustThreadsCount();
151        finishLock.reset(); // reset finishLock
152        // Wakeup helper threads, waiting on startLock, so they start polling.
153        // Redundant threads will exit here after wakeup.
154        startLock.startThreads();
155        // do polling in the main thread. Main thread is responsible for
156        // first MAX_SELECTABLE_FDS entries in pollArray.
157        try {
158            begin();
159            try {
160                subSelector.poll();
161            } catch (IOException e) {
162                finishLock.setException(e); // Save this exception
163            }
164            // Main thread is out of poll(). Wakeup others and wait for them
165            if (threads.size() > 0)
166                finishLock.waitForHelperThreads();
167          } finally {
168              end();
169          }
170        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
171        finishLock.checkForException();
172        processDeregisterQueue();
173        int updated = updateSelectedKeys();
174        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
175        resetWakeupSocket();
176        return updated;
177    }
178
179    // Helper threads wait on this lock for the next poll.
180    private final StartLock startLock = new StartLock();
181
182    private final class StartLock {
183        // A variable which distinguishes the current run of doSelect from the
184        // previous one. Incrementing runsCounter and notifying threads will
185        // trigger another round of poll.
186        private long runsCounter;
187       // Triggers threads, waiting on this lock to start polling.
188        private synchronized void startThreads() {
189            runsCounter++; // next run
190            notifyAll(); // wake up threads.
191        }
192        // This function is called by a helper thread to wait for the
193        // next round of poll(). It also checks, if this thread became
194        // redundant. If yes, it returns true, notifying the thread
195        // that it should exit.
196        private synchronized boolean waitForStart(SelectThread thread) {
197            while (true) {
198                while (runsCounter == thread.lastRun) {
199                    try {
200                        startLock.wait();
201                    } catch (InterruptedException e) {
202                        Thread.currentThread().interrupt();
203                    }
204                }
205                if (thread.isZombie()) { // redundant thread
206                    return true; // will cause run() to exit.
207                } else {
208                    thread.lastRun = runsCounter; // update lastRun
209                    return false; //   will cause run() to poll.
210                }
211            }
212        }
213    }
214
215    // Main thread waits on this lock, until all helper threads are done
216    // with poll().
217    private final FinishLock finishLock = new FinishLock();
218
219    private final class FinishLock  {
220        // Number of helper threads, that did not finish yet.
221        private int threadsToFinish;
222
223        // IOException which occurred during the last run.
224        IOException exception = null;
225
226        // Called before polling.
227        private void reset() {
228            threadsToFinish = threads.size(); // helper threads
229        }
230
231        // Each helper thread invokes this function on finishLock, when
232        // the thread is done with poll().
233        private synchronized void threadFinished() {
234            if (threadsToFinish == threads.size()) { // finished poll() first
235                // if finished first, wakeup others
236                wakeup();
237            }
238            threadsToFinish--;
239            if (threadsToFinish == 0) // all helper threads finished poll().
240                notify();             // notify the main thread
241        }
242
243        // The main thread invokes this function on finishLock to wait
244        // for helper threads to finish poll().
245        private synchronized void waitForHelperThreads() {
246            if (threadsToFinish == threads.size()) {
247                // no helper threads finished yet. Wakeup them up.
248                wakeup();
249            }
250            while (threadsToFinish != 0) {
251                try {
252                    finishLock.wait();
253                } catch (InterruptedException e) {
254                    // Interrupted - set interrupted state.
255                    Thread.currentThread().interrupt();
256                }
257            }
258        }
259
260        // sets IOException for this run
261        private synchronized void setException(IOException e) {
262            exception = e;
263        }
264
265        // Checks if there was any exception during the last run.
266        // If yes, throws it
267        private void checkForException() throws IOException {
268            if (exception == null)
269                return;
270            StringBuffer message =  new StringBuffer("An exception occurred" +
271                                       " during the execution of select(): \n");
272            message.append(exception);
273            message.append('\n');
274            exception = null;
275            throw new IOException(message.toString());
276        }
277    }
278
279    private final class SubSelector {
280        private final int pollArrayIndex; // starting index in pollArray to poll
281        // These arrays will hold result of native select().
282        // The first element of each array is the number of selected sockets.
283        // Other elements are file descriptors of selected sockets.
284        private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
285        private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
286        private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
287
288        private SubSelector() {
289            this.pollArrayIndex = 0; // main thread
290        }
291
292        private SubSelector(int threadIndex) { // helper threads
293            this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
294        }
295
296        private int poll() throws IOException{ // poll for the main thread
297            return poll0(pollWrapper.pollArrayAddress,
298                         Math.min(totalChannels, MAX_SELECTABLE_FDS),
299                         readFds, writeFds, exceptFds, timeout);
300        }
301
302        private int poll(int index) throws IOException {
303            // poll for helper threads
304            return  poll0(pollWrapper.pollArrayAddress +
305                     (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
306                     Math.min(MAX_SELECTABLE_FDS,
307                             totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
308                     readFds, writeFds, exceptFds, timeout);
309        }
310
311        private native int poll0(long pollAddress, int numfds,
312             int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
313
314        private int processSelectedKeys(long updateCount) {
315            int numKeysUpdated = 0;
316            numKeysUpdated += processFDSet(updateCount, readFds,
317                                           Net.POLLIN,
318                                           false);
319            numKeysUpdated += processFDSet(updateCount, writeFds,
320                                           Net.POLLCONN |
321                                           Net.POLLOUT,
322                                           false);
323            numKeysUpdated += processFDSet(updateCount, exceptFds,
324                                           Net.POLLIN |
325                                           Net.POLLCONN |
326                                           Net.POLLOUT,
327                                           true);
328            return numKeysUpdated;
329        }
330
331        /**
332         * Note, clearedCount is used to determine if the readyOps have
333         * been reset in this select operation. updateCount is used to
334         * tell if a key has been counted as updated in this select
335         * operation.
336         *
337         * me.updateCount <= me.clearedCount <= updateCount
338         */
339        private int processFDSet(long updateCount, int[] fds, int rOps,
340                                 boolean isExceptFds)
341        {
342            int numKeysUpdated = 0;
343            for (int i = 1; i <= fds[0]; i++) {
344                int desc = fds[i];
345                if (desc == wakeupSourceFd) {
346                    synchronized (interruptLock) {
347                        interruptTriggered = true;
348                    }
349                    continue;
350                }
351                MapEntry me = fdMap.get(desc);
352                // If me is null, the key was deregistered in the previous
353                // processDeregisterQueue.
354                if (me == null)
355                    continue;
356                SelectionKeyImpl sk = me.ski;
357
358                // The descriptor may be in the exceptfds set because there is
359                // OOB data queued to the socket. If there is OOB data then it
360                // is discarded and the key is not added to the selected set.
361                if (isExceptFds &&
362                    (sk.channel() instanceof SocketChannelImpl) &&
363                    discardUrgentData(desc))
364                {
365                    continue;
366                }
367
368                if (selectedKeys.contains(sk)) { // Key in selected set
369                    if (me.clearedCount != updateCount) {
370                        if (sk.channel.translateAndSetReadyOps(rOps, sk) &&
371                            (me.updateCount != updateCount)) {
372                            me.updateCount = updateCount;
373                            numKeysUpdated++;
374                        }
375                    } else { // The readyOps have been set; now add
376                        if (sk.channel.translateAndUpdateReadyOps(rOps, sk) &&
377                            (me.updateCount != updateCount)) {
378                            me.updateCount = updateCount;
379                            numKeysUpdated++;
380                        }
381                    }
382                    me.clearedCount = updateCount;
383                } else { // Key is not in selected set yet
384                    if (me.clearedCount != updateCount) {
385                        sk.channel.translateAndSetReadyOps(rOps, sk);
386                        if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
387                            selectedKeys.add(sk);
388                            me.updateCount = updateCount;
389                            numKeysUpdated++;
390                        }
391                    } else { // The readyOps have been set; now add
392                        sk.channel.translateAndUpdateReadyOps(rOps, sk);
393                        if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
394                            selectedKeys.add(sk);
395                            me.updateCount = updateCount;
396                            numKeysUpdated++;
397                        }
398                    }
399                    me.clearedCount = updateCount;
400                }
401            }
402            return numKeysUpdated;
403        }
404    }
405
406    // Represents a helper thread used for select.
407    private final class SelectThread extends ManagedLocalsThread {
408        private final int index; // index of this thread
409        final SubSelector subSelector;
410        private long lastRun = 0; // last run number
411        private volatile boolean zombie;
412        // Creates a new thread
413        private SelectThread(int i) {
414            this.index = i;
415            this.subSelector = new SubSelector(i);
416            //make sure we wait for next round of poll
417            this.lastRun = startLock.runsCounter;
418        }
419        void makeZombie() {
420            zombie = true;
421        }
422        boolean isZombie() {
423            return zombie;
424        }
425        public void run() {
426            while (true) { // poll loop
427                // wait for the start of poll. If this thread has become
428                // redundant, then exit.
429                if (startLock.waitForStart(this))
430                    return;
431                // call poll()
432                try {
433                    subSelector.poll(index);
434                } catch (IOException e) {
435                    // Save this exception and let other threads finish.
436                    finishLock.setException(e);
437                }
438                // notify main thread, that this thread has finished, and
439                // wakeup others, if this thread is the first to finish.
440                finishLock.threadFinished();
441            }
442        }
443    }
444
445    // After some channels registered/deregistered, the number of required
446    // helper threads may have changed. Adjust this number.
447    private void adjustThreadsCount() {
448        if (threadsCount > threads.size()) {
449            // More threads needed. Start more threads.
450            for (int i = threads.size(); i < threadsCount; i++) {
451                SelectThread newThread = new SelectThread(i);
452                threads.add(newThread);
453                newThread.setDaemon(true);
454                newThread.start();
455            }
456        } else if (threadsCount < threads.size()) {
457            // Some threads become redundant. Remove them from the threads List.
458            for (int i = threads.size() - 1 ; i >= threadsCount; i--)
459                threads.remove(i).makeZombie();
460        }
461    }
462
463    // Sets Windows wakeup socket to a signaled state.
464    private void setWakeupSocket() {
465        setWakeupSocket0(wakeupSinkFd);
466    }
467    private native void setWakeupSocket0(int wakeupSinkFd);
468
469    // Sets Windows wakeup socket to a non-signaled state.
470    private void resetWakeupSocket() {
471        synchronized (interruptLock) {
472            if (interruptTriggered == false)
473                return;
474            resetWakeupSocket0(wakeupSourceFd);
475            interruptTriggered = false;
476        }
477    }
478
479    private native void resetWakeupSocket0(int wakeupSourceFd);
480
481    private native boolean discardUrgentData(int fd);
482
483    // We increment this counter on each call to updateSelectedKeys()
484    // each entry in  SubSelector.fdsMap has a memorized value of
485    // updateCount. When we increment numKeysUpdated we set updateCount
486    // for the corresponding entry to its current value. This is used to
487    // avoid counting the same key more than once - the same key can
488    // appear in readfds and writefds.
489    private long updateCount = 0;
490
491    // Update ops of the corresponding Channels. Add the ready keys to the
492    // ready queue.
493    private int updateSelectedKeys() {
494        updateCount++;
495        int numKeysUpdated = 0;
496        numKeysUpdated += subSelector.processSelectedKeys(updateCount);
497        for (SelectThread t: threads) {
498            numKeysUpdated += t.subSelector.processSelectedKeys(updateCount);
499        }
500        return numKeysUpdated;
501    }
502
503    protected void implClose() throws IOException {
504        synchronized (closeLock) {
505            if (channelArray != null) {
506                if (pollWrapper != null) {
507                    // prevent further wakeup
508                    synchronized (interruptLock) {
509                        interruptTriggered = true;
510                    }
511                    wakeupPipe.sink().close();
512                    wakeupPipe.source().close();
513                    for(int i = 1; i < totalChannels; i++) { // Deregister channels
514                        if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent
515                            deregister(channelArray[i]);
516                            SelectableChannel selch = channelArray[i].channel();
517                            if (!selch.isOpen() && !selch.isRegistered())
518                                ((SelChImpl)selch).kill();
519                        }
520                    }
521                    pollWrapper.free();
522                    pollWrapper = null;
523                    selectedKeys = null;
524                    channelArray = null;
525                    // Make all remaining helper threads exit
526                    for (SelectThread t: threads)
527                         t.makeZombie();
528                    startLock.startThreads();
529                }
530            }
531        }
532    }
533
534    protected void implRegister(SelectionKeyImpl ski) {
535        synchronized (closeLock) {
536            if (pollWrapper == null)
537                throw new ClosedSelectorException();
538            growIfNeeded();
539            channelArray[totalChannels] = ski;
540            ski.setIndex(totalChannels);
541            fdMap.put(ski);
542            keys.add(ski);
543            pollWrapper.addEntry(totalChannels, ski);
544            totalChannels++;
545        }
546    }
547
548    private void growIfNeeded() {
549        if (channelArray.length == totalChannels) {
550            int newSize = totalChannels * 2; // Make a larger array
551            SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
552            System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
553            channelArray = temp;
554            pollWrapper.grow(newSize);
555        }
556        if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
557            pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
558            totalChannels++;
559            threadsCount++;
560        }
561    }
562
563    protected void implDereg(SelectionKeyImpl ski) throws IOException{
564        int i = ski.getIndex();
565        assert (i >= 0);
566        synchronized (closeLock) {
567            if (i != totalChannels - 1) {
568                // Copy end one over it
569                SelectionKeyImpl endChannel = channelArray[totalChannels-1];
570                channelArray[i] = endChannel;
571                endChannel.setIndex(i);
572                pollWrapper.replaceEntry(pollWrapper, totalChannels - 1,
573                                                                pollWrapper, i);
574            }
575            ski.setIndex(-1);
576        }
577        channelArray[totalChannels - 1] = null;
578        totalChannels--;
579        if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
580            totalChannels--;
581            threadsCount--; // The last thread has become redundant.
582        }
583        fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys
584        keys.remove(ski);
585        selectedKeys.remove(ski);
586        deregister(ski);
587        SelectableChannel selch = ski.channel();
588        if (!selch.isOpen() && !selch.isRegistered())
589            ((SelChImpl)selch).kill();
590    }
591
592    public void putEventOps(SelectionKeyImpl sk, int ops) {
593        synchronized (closeLock) {
594            if (pollWrapper == null)
595                throw new ClosedSelectorException();
596            // make sure this sk has not been removed yet
597            int index = sk.getIndex();
598            if (index == -1)
599                throw new CancelledKeyException();
600            pollWrapper.putEventOps(index, ops);
601        }
602    }
603
604    public Selector wakeup() {
605        synchronized (interruptLock) {
606            if (!interruptTriggered) {
607                setWakeupSocket();
608                interruptTriggered = true;
609            }
610        }
611        return this;
612    }
613
614    static {
615        IOUtil.load();
616    }
617}
618