WindowsSelectorImpl.java revision 12745:f068a4ffddd2
1/* 2 * Copyright (c) 2002, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26/* 27 */ 28 29 30package sun.nio.ch; 31 32import java.nio.channels.spi.SelectorProvider; 33import java.nio.channels.Selector; 34import java.nio.channels.ClosedSelectorException; 35import java.nio.channels.Pipe; 36import java.nio.channels.SelectableChannel; 37import java.io.IOException; 38import java.nio.channels.CancelledKeyException; 39import java.util.List; 40import java.util.ArrayList; 41import java.util.HashMap; 42import java.util.Iterator; 43import sun.misc.ManagedLocalsThread; 44 45/** 46 * A multi-threaded implementation of Selector for Windows. 47 * 48 * @author Konstantin Kladko 49 * @author Mark Reinhold 50 */ 51 52final class WindowsSelectorImpl extends SelectorImpl { 53 // Initial capacity of the poll array 54 private final int INIT_CAP = 8; 55 // Maximum number of sockets for select(). 56 // Should be INIT_CAP times a power of 2 57 private static final int MAX_SELECTABLE_FDS = 1024; 58 59 // The list of SelectableChannels serviced by this Selector. Every mod 60 // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll 61 // array, where the corresponding entry is occupied by the wakeupSocket 62 private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP]; 63 64 // The global native poll array holds file decriptors and event masks 65 private PollArrayWrapper pollWrapper; 66 67 // The number of valid entries in poll array, including entries occupied 68 // by wakeup socket handle. 69 private int totalChannels = 1; 70 71 // Number of helper threads needed for select. We need one thread per 72 // each additional set of MAX_SELECTABLE_FDS - 1 channels. 73 private int threadsCount = 0; 74 75 // A list of helper threads for select. 76 private final List<SelectThread> threads = new ArrayList<SelectThread>(); 77 78 //Pipe used as a wakeup object. 79 private final Pipe wakeupPipe; 80 81 // File descriptors corresponding to source and sink 82 private final int wakeupSourceFd, wakeupSinkFd; 83 84 // Lock for close cleanup 85 private Object closeLock = new Object(); 86 87 // Maps file descriptors to their indices in pollArray 88 private static final class FdMap extends HashMap<Integer, MapEntry> { 89 static final long serialVersionUID = 0L; 90 private MapEntry get(int desc) { 91 return get(new Integer(desc)); 92 } 93 private MapEntry put(SelectionKeyImpl ski) { 94 return put(new Integer(ski.channel.getFDVal()), new MapEntry(ski)); 95 } 96 private MapEntry remove(SelectionKeyImpl ski) { 97 Integer fd = new Integer(ski.channel.getFDVal()); 98 MapEntry x = get(fd); 99 if ((x != null) && (x.ski.channel == ski.channel)) 100 return remove(fd); 101 return null; 102 } 103 } 104 105 // class for fdMap entries 106 private static final class MapEntry { 107 SelectionKeyImpl ski; 108 long updateCount = 0; 109 long clearedCount = 0; 110 MapEntry(SelectionKeyImpl ski) { 111 this.ski = ski; 112 } 113 } 114 private final FdMap fdMap = new FdMap(); 115 116 // SubSelector for the main thread 117 private final SubSelector subSelector = new SubSelector(); 118 119 private long timeout; //timeout for poll 120 121 // Lock for interrupt triggering and clearing 122 private final Object interruptLock = new Object(); 123 private volatile boolean interruptTriggered = false; 124 125 WindowsSelectorImpl(SelectorProvider sp) throws IOException { 126 super(sp); 127 pollWrapper = new PollArrayWrapper(INIT_CAP); 128 wakeupPipe = Pipe.open(); 129 wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); 130 131 // Disable the Nagle algorithm so that the wakeup is more immediate 132 SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); 133 (sink.sc).socket().setTcpNoDelay(true); 134 wakeupSinkFd = ((SelChImpl)sink).getFDVal(); 135 136 pollWrapper.addWakeupSocket(wakeupSourceFd, 0); 137 } 138 139 protected int doSelect(long timeout) throws IOException { 140 if (channelArray == null) 141 throw new ClosedSelectorException(); 142 this.timeout = timeout; // set selector timeout 143 processDeregisterQueue(); 144 if (interruptTriggered) { 145 resetWakeupSocket(); 146 return 0; 147 } 148 // Calculate number of helper threads needed for poll. If necessary 149 // threads are created here and start waiting on startLock 150 adjustThreadsCount(); 151 finishLock.reset(); // reset finishLock 152 // Wakeup helper threads, waiting on startLock, so they start polling. 153 // Redundant threads will exit here after wakeup. 154 startLock.startThreads(); 155 // do polling in the main thread. Main thread is responsible for 156 // first MAX_SELECTABLE_FDS entries in pollArray. 157 try { 158 begin(); 159 try { 160 subSelector.poll(); 161 } catch (IOException e) { 162 finishLock.setException(e); // Save this exception 163 } 164 // Main thread is out of poll(). Wakeup others and wait for them 165 if (threads.size() > 0) 166 finishLock.waitForHelperThreads(); 167 } finally { 168 end(); 169 } 170 // Done with poll(). Set wakeupSocket to nonsignaled for the next run. 171 finishLock.checkForException(); 172 processDeregisterQueue(); 173 int updated = updateSelectedKeys(); 174 // Done with poll(). Set wakeupSocket to nonsignaled for the next run. 175 resetWakeupSocket(); 176 return updated; 177 } 178 179 // Helper threads wait on this lock for the next poll. 180 private final StartLock startLock = new StartLock(); 181 182 private final class StartLock { 183 // A variable which distinguishes the current run of doSelect from the 184 // previous one. Incrementing runsCounter and notifying threads will 185 // trigger another round of poll. 186 private long runsCounter; 187 // Triggers threads, waiting on this lock to start polling. 188 private synchronized void startThreads() { 189 runsCounter++; // next run 190 notifyAll(); // wake up threads. 191 } 192 // This function is called by a helper thread to wait for the 193 // next round of poll(). It also checks, if this thread became 194 // redundant. If yes, it returns true, notifying the thread 195 // that it should exit. 196 private synchronized boolean waitForStart(SelectThread thread) { 197 while (true) { 198 while (runsCounter == thread.lastRun) { 199 try { 200 startLock.wait(); 201 } catch (InterruptedException e) { 202 Thread.currentThread().interrupt(); 203 } 204 } 205 if (thread.isZombie()) { // redundant thread 206 return true; // will cause run() to exit. 207 } else { 208 thread.lastRun = runsCounter; // update lastRun 209 return false; // will cause run() to poll. 210 } 211 } 212 } 213 } 214 215 // Main thread waits on this lock, until all helper threads are done 216 // with poll(). 217 private final FinishLock finishLock = new FinishLock(); 218 219 private final class FinishLock { 220 // Number of helper threads, that did not finish yet. 221 private int threadsToFinish; 222 223 // IOException which occurred during the last run. 224 IOException exception = null; 225 226 // Called before polling. 227 private void reset() { 228 threadsToFinish = threads.size(); // helper threads 229 } 230 231 // Each helper thread invokes this function on finishLock, when 232 // the thread is done with poll(). 233 private synchronized void threadFinished() { 234 if (threadsToFinish == threads.size()) { // finished poll() first 235 // if finished first, wakeup others 236 wakeup(); 237 } 238 threadsToFinish--; 239 if (threadsToFinish == 0) // all helper threads finished poll(). 240 notify(); // notify the main thread 241 } 242 243 // The main thread invokes this function on finishLock to wait 244 // for helper threads to finish poll(). 245 private synchronized void waitForHelperThreads() { 246 if (threadsToFinish == threads.size()) { 247 // no helper threads finished yet. Wakeup them up. 248 wakeup(); 249 } 250 while (threadsToFinish != 0) { 251 try { 252 finishLock.wait(); 253 } catch (InterruptedException e) { 254 // Interrupted - set interrupted state. 255 Thread.currentThread().interrupt(); 256 } 257 } 258 } 259 260 // sets IOException for this run 261 private synchronized void setException(IOException e) { 262 exception = e; 263 } 264 265 // Checks if there was any exception during the last run. 266 // If yes, throws it 267 private void checkForException() throws IOException { 268 if (exception == null) 269 return; 270 StringBuffer message = new StringBuffer("An exception occurred" + 271 " during the execution of select(): \n"); 272 message.append(exception); 273 message.append('\n'); 274 exception = null; 275 throw new IOException(message.toString()); 276 } 277 } 278 279 private final class SubSelector { 280 private final int pollArrayIndex; // starting index in pollArray to poll 281 // These arrays will hold result of native select(). 282 // The first element of each array is the number of selected sockets. 283 // Other elements are file descriptors of selected sockets. 284 private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1]; 285 private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; 286 private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; 287 288 private SubSelector() { 289 this.pollArrayIndex = 0; // main thread 290 } 291 292 private SubSelector(int threadIndex) { // helper threads 293 this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS; 294 } 295 296 private int poll() throws IOException{ // poll for the main thread 297 return poll0(pollWrapper.pollArrayAddress, 298 Math.min(totalChannels, MAX_SELECTABLE_FDS), 299 readFds, writeFds, exceptFds, timeout); 300 } 301 302 private int poll(int index) throws IOException { 303 // poll for helper threads 304 return poll0(pollWrapper.pollArrayAddress + 305 (pollArrayIndex * PollArrayWrapper.SIZE_POLLFD), 306 Math.min(MAX_SELECTABLE_FDS, 307 totalChannels - (index + 1) * MAX_SELECTABLE_FDS), 308 readFds, writeFds, exceptFds, timeout); 309 } 310 311 private native int poll0(long pollAddress, int numfds, 312 int[] readFds, int[] writeFds, int[] exceptFds, long timeout); 313 314 private int processSelectedKeys(long updateCount) { 315 int numKeysUpdated = 0; 316 numKeysUpdated += processFDSet(updateCount, readFds, 317 Net.POLLIN, 318 false); 319 numKeysUpdated += processFDSet(updateCount, writeFds, 320 Net.POLLCONN | 321 Net.POLLOUT, 322 false); 323 numKeysUpdated += processFDSet(updateCount, exceptFds, 324 Net.POLLIN | 325 Net.POLLCONN | 326 Net.POLLOUT, 327 true); 328 return numKeysUpdated; 329 } 330 331 /** 332 * Note, clearedCount is used to determine if the readyOps have 333 * been reset in this select operation. updateCount is used to 334 * tell if a key has been counted as updated in this select 335 * operation. 336 * 337 * me.updateCount <= me.clearedCount <= updateCount 338 */ 339 private int processFDSet(long updateCount, int[] fds, int rOps, 340 boolean isExceptFds) 341 { 342 int numKeysUpdated = 0; 343 for (int i = 1; i <= fds[0]; i++) { 344 int desc = fds[i]; 345 if (desc == wakeupSourceFd) { 346 synchronized (interruptLock) { 347 interruptTriggered = true; 348 } 349 continue; 350 } 351 MapEntry me = fdMap.get(desc); 352 // If me is null, the key was deregistered in the previous 353 // processDeregisterQueue. 354 if (me == null) 355 continue; 356 SelectionKeyImpl sk = me.ski; 357 358 // The descriptor may be in the exceptfds set because there is 359 // OOB data queued to the socket. If there is OOB data then it 360 // is discarded and the key is not added to the selected set. 361 if (isExceptFds && 362 (sk.channel() instanceof SocketChannelImpl) && 363 discardUrgentData(desc)) 364 { 365 continue; 366 } 367 368 if (selectedKeys.contains(sk)) { // Key in selected set 369 if (me.clearedCount != updateCount) { 370 if (sk.channel.translateAndSetReadyOps(rOps, sk) && 371 (me.updateCount != updateCount)) { 372 me.updateCount = updateCount; 373 numKeysUpdated++; 374 } 375 } else { // The readyOps have been set; now add 376 if (sk.channel.translateAndUpdateReadyOps(rOps, sk) && 377 (me.updateCount != updateCount)) { 378 me.updateCount = updateCount; 379 numKeysUpdated++; 380 } 381 } 382 me.clearedCount = updateCount; 383 } else { // Key is not in selected set yet 384 if (me.clearedCount != updateCount) { 385 sk.channel.translateAndSetReadyOps(rOps, sk); 386 if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) { 387 selectedKeys.add(sk); 388 me.updateCount = updateCount; 389 numKeysUpdated++; 390 } 391 } else { // The readyOps have been set; now add 392 sk.channel.translateAndUpdateReadyOps(rOps, sk); 393 if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) { 394 selectedKeys.add(sk); 395 me.updateCount = updateCount; 396 numKeysUpdated++; 397 } 398 } 399 me.clearedCount = updateCount; 400 } 401 } 402 return numKeysUpdated; 403 } 404 } 405 406 // Represents a helper thread used for select. 407 private final class SelectThread extends ManagedLocalsThread { 408 private final int index; // index of this thread 409 final SubSelector subSelector; 410 private long lastRun = 0; // last run number 411 private volatile boolean zombie; 412 // Creates a new thread 413 private SelectThread(int i) { 414 this.index = i; 415 this.subSelector = new SubSelector(i); 416 //make sure we wait for next round of poll 417 this.lastRun = startLock.runsCounter; 418 } 419 void makeZombie() { 420 zombie = true; 421 } 422 boolean isZombie() { 423 return zombie; 424 } 425 public void run() { 426 while (true) { // poll loop 427 // wait for the start of poll. If this thread has become 428 // redundant, then exit. 429 if (startLock.waitForStart(this)) 430 return; 431 // call poll() 432 try { 433 subSelector.poll(index); 434 } catch (IOException e) { 435 // Save this exception and let other threads finish. 436 finishLock.setException(e); 437 } 438 // notify main thread, that this thread has finished, and 439 // wakeup others, if this thread is the first to finish. 440 finishLock.threadFinished(); 441 } 442 } 443 } 444 445 // After some channels registered/deregistered, the number of required 446 // helper threads may have changed. Adjust this number. 447 private void adjustThreadsCount() { 448 if (threadsCount > threads.size()) { 449 // More threads needed. Start more threads. 450 for (int i = threads.size(); i < threadsCount; i++) { 451 SelectThread newThread = new SelectThread(i); 452 threads.add(newThread); 453 newThread.setDaemon(true); 454 newThread.start(); 455 } 456 } else if (threadsCount < threads.size()) { 457 // Some threads become redundant. Remove them from the threads List. 458 for (int i = threads.size() - 1 ; i >= threadsCount; i--) 459 threads.remove(i).makeZombie(); 460 } 461 } 462 463 // Sets Windows wakeup socket to a signaled state. 464 private void setWakeupSocket() { 465 setWakeupSocket0(wakeupSinkFd); 466 } 467 private native void setWakeupSocket0(int wakeupSinkFd); 468 469 // Sets Windows wakeup socket to a non-signaled state. 470 private void resetWakeupSocket() { 471 synchronized (interruptLock) { 472 if (interruptTriggered == false) 473 return; 474 resetWakeupSocket0(wakeupSourceFd); 475 interruptTriggered = false; 476 } 477 } 478 479 private native void resetWakeupSocket0(int wakeupSourceFd); 480 481 private native boolean discardUrgentData(int fd); 482 483 // We increment this counter on each call to updateSelectedKeys() 484 // each entry in SubSelector.fdsMap has a memorized value of 485 // updateCount. When we increment numKeysUpdated we set updateCount 486 // for the corresponding entry to its current value. This is used to 487 // avoid counting the same key more than once - the same key can 488 // appear in readfds and writefds. 489 private long updateCount = 0; 490 491 // Update ops of the corresponding Channels. Add the ready keys to the 492 // ready queue. 493 private int updateSelectedKeys() { 494 updateCount++; 495 int numKeysUpdated = 0; 496 numKeysUpdated += subSelector.processSelectedKeys(updateCount); 497 for (SelectThread t: threads) { 498 numKeysUpdated += t.subSelector.processSelectedKeys(updateCount); 499 } 500 return numKeysUpdated; 501 } 502 503 protected void implClose() throws IOException { 504 synchronized (closeLock) { 505 if (channelArray != null) { 506 if (pollWrapper != null) { 507 // prevent further wakeup 508 synchronized (interruptLock) { 509 interruptTriggered = true; 510 } 511 wakeupPipe.sink().close(); 512 wakeupPipe.source().close(); 513 for(int i = 1; i < totalChannels; i++) { // Deregister channels 514 if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent 515 deregister(channelArray[i]); 516 SelectableChannel selch = channelArray[i].channel(); 517 if (!selch.isOpen() && !selch.isRegistered()) 518 ((SelChImpl)selch).kill(); 519 } 520 } 521 pollWrapper.free(); 522 pollWrapper = null; 523 selectedKeys = null; 524 channelArray = null; 525 // Make all remaining helper threads exit 526 for (SelectThread t: threads) 527 t.makeZombie(); 528 startLock.startThreads(); 529 } 530 } 531 } 532 } 533 534 protected void implRegister(SelectionKeyImpl ski) { 535 synchronized (closeLock) { 536 if (pollWrapper == null) 537 throw new ClosedSelectorException(); 538 growIfNeeded(); 539 channelArray[totalChannels] = ski; 540 ski.setIndex(totalChannels); 541 fdMap.put(ski); 542 keys.add(ski); 543 pollWrapper.addEntry(totalChannels, ski); 544 totalChannels++; 545 } 546 } 547 548 private void growIfNeeded() { 549 if (channelArray.length == totalChannels) { 550 int newSize = totalChannels * 2; // Make a larger array 551 SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize]; 552 System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1); 553 channelArray = temp; 554 pollWrapper.grow(newSize); 555 } 556 if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed 557 pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels); 558 totalChannels++; 559 threadsCount++; 560 } 561 } 562 563 protected void implDereg(SelectionKeyImpl ski) throws IOException{ 564 int i = ski.getIndex(); 565 assert (i >= 0); 566 synchronized (closeLock) { 567 if (i != totalChannels - 1) { 568 // Copy end one over it 569 SelectionKeyImpl endChannel = channelArray[totalChannels-1]; 570 channelArray[i] = endChannel; 571 endChannel.setIndex(i); 572 pollWrapper.replaceEntry(pollWrapper, totalChannels - 1, 573 pollWrapper, i); 574 } 575 ski.setIndex(-1); 576 } 577 channelArray[totalChannels - 1] = null; 578 totalChannels--; 579 if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) { 580 totalChannels--; 581 threadsCount--; // The last thread has become redundant. 582 } 583 fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys 584 keys.remove(ski); 585 selectedKeys.remove(ski); 586 deregister(ski); 587 SelectableChannel selch = ski.channel(); 588 if (!selch.isOpen() && !selch.isRegistered()) 589 ((SelChImpl)selch).kill(); 590 } 591 592 public void putEventOps(SelectionKeyImpl sk, int ops) { 593 synchronized (closeLock) { 594 if (pollWrapper == null) 595 throw new ClosedSelectorException(); 596 // make sure this sk has not been removed yet 597 int index = sk.getIndex(); 598 if (index == -1) 599 throw new CancelledKeyException(); 600 pollWrapper.putEventOps(index, ops); 601 } 602 } 603 604 public Selector wakeup() { 605 synchronized (interruptLock) { 606 if (!interruptTriggered) { 607 setWakeupSocket(); 608 interruptTriggered = true; 609 } 610 } 611 return this; 612 } 613 614 static { 615 IOUtil.load(); 616 } 617} 618