SelectorImpl.java revision 758:bb6bf34f121f
1/* 2 * Copyright (c) 2003, 2015, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26package com.sun.corba.se.impl.transport; 27 28import java.io.IOException; 29import java.net.ServerSocket; 30import java.nio.channels.ClosedChannelException; 31import java.nio.channels.SelectableChannel; 32import java.nio.channels.ServerSocketChannel; 33import java.nio.channels.SelectionKey; 34import java.nio.channels.Selector; 35import java.nio.channels.ClosedSelectorException; 36import java.util.ArrayList; 37import java.util.HashMap; 38import java.util.Map; 39import java.util.Iterator; 40import java.util.List; 41 42 43import com.sun.corba.se.pept.broker.Broker; 44import com.sun.corba.se.pept.transport.Acceptor; 45import com.sun.corba.se.pept.transport.Connection; 46import com.sun.corba.se.pept.transport.EventHandler; 47import com.sun.corba.se.pept.transport.ListenerThread; 48import com.sun.corba.se.pept.transport.ReaderThread; 49 50import com.sun.corba.se.spi.logging.CORBALogDomains; 51import com.sun.corba.se.spi.orb.ORB; 52import com.sun.corba.se.spi.orbutil.threadpool.Work; 53import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException; 54import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException; 55 56import com.sun.corba.se.impl.logging.ORBUtilSystemException; 57import com.sun.corba.se.impl.orbutil.ORBUtility; 58 59/** 60 * @author Harold Carr 61 */ 62class SelectorImpl 63 extends 64 Thread 65 implements 66 com.sun.corba.se.pept.transport.Selector 67{ 68 private ORB orb; 69 private Selector selector; 70 private long timeout; 71 private List deferredRegistrations; 72 private List interestOpsList; 73 private HashMap listenerThreads; 74 private Map readerThreads; 75 private boolean selectorStarted; 76 private volatile boolean closed; 77 private ORBUtilSystemException wrapper; 78 79 80 public SelectorImpl(ORB orb) 81 { 82 super(null, null, "ORB-Selector-Thread", 0, false); 83 this.orb = orb; 84 selector = null; 85 selectorStarted = false; 86 timeout = 60000; 87 deferredRegistrations = new ArrayList(); 88 interestOpsList = new ArrayList(); 89 listenerThreads = new HashMap(); 90 readerThreads = java.util.Collections.synchronizedMap(new HashMap()); 91 closed = false; 92 wrapper = ORBUtilSystemException.get(orb,CORBALogDomains.RPC_TRANSPORT); 93 } 94 95 public void setTimeout(long timeout) 96 { 97 this.timeout = timeout; 98 } 99 100 public long getTimeout() 101 { 102 return timeout; 103 } 104 105 public void registerInterestOps(EventHandler eventHandler) 106 { 107 if (orb.transportDebugFlag) { 108 dprint(".registerInterestOps:-> " + eventHandler); 109 } 110 111 SelectionKey selectionKey = eventHandler.getSelectionKey(); 112 if (selectionKey.isValid()) { 113 int ehOps = eventHandler.getInterestOps(); 114 SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps); 115 synchronized(interestOpsList) { 116 interestOpsList.add(keyAndOp); 117 } 118 // tell Selector Thread there's an update to a SelectorKey's Ops 119 try { 120 if (selector != null) { 121 // wakeup Selector thread to process close request 122 selector.wakeup(); 123 } 124 } catch (Throwable t) { 125 if (orb.transportDebugFlag) { 126 dprint(".registerInterestOps: selector.wakeup: ", t); 127 } 128 } 129 } 130 else { 131 wrapper.selectionKeyInvalid(eventHandler.toString()); 132 if (orb.transportDebugFlag) { 133 dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler); 134 } 135 } 136 137 if (orb.transportDebugFlag) { 138 dprint(".registerInterestOps:<- "); 139 } 140 } 141 142 public void registerForEvent(EventHandler eventHandler) 143 { 144 if (orb.transportDebugFlag) { 145 dprint(".registerForEvent: " + eventHandler); 146 } 147 148 if (isClosed()) { 149 if (orb.transportDebugFlag) { 150 dprint(".registerForEvent: closed: " + eventHandler); 151 } 152 return; 153 } 154 155 if (eventHandler.shouldUseSelectThreadToWait()) { 156 synchronized (deferredRegistrations) { 157 deferredRegistrations.add(eventHandler); 158 } 159 if (! selectorStarted) { 160 startSelector(); 161 } 162 selector.wakeup(); 163 return; 164 } 165 166 switch (eventHandler.getInterestOps()) { 167 case SelectionKey.OP_ACCEPT : 168 createListenerThread(eventHandler); 169 break; 170 case SelectionKey.OP_READ : 171 createReaderThread(eventHandler); 172 break; 173 default: 174 if (orb.transportDebugFlag) { 175 dprint(".registerForEvent: default: " + eventHandler); 176 } 177 throw new RuntimeException( 178 "SelectorImpl.registerForEvent: unknown interest ops"); 179 } 180 } 181 182 public void unregisterForEvent(EventHandler eventHandler) 183 { 184 if (orb.transportDebugFlag) { 185 dprint(".unregisterForEvent: " + eventHandler); 186 } 187 188 if (isClosed()) { 189 if (orb.transportDebugFlag) { 190 dprint(".unregisterForEvent: closed: " + eventHandler); 191 } 192 return; 193 } 194 195 if (eventHandler.shouldUseSelectThreadToWait()) { 196 SelectionKey selectionKey ; 197 synchronized(deferredRegistrations) { 198 selectionKey = eventHandler.getSelectionKey(); 199 } 200 if (selectionKey != null) { 201 selectionKey.cancel(); 202 } 203 if (selector != null) { 204 selector.wakeup(); 205 } 206 return; 207 } 208 209 switch (eventHandler.getInterestOps()) { 210 case SelectionKey.OP_ACCEPT : 211 destroyListenerThread(eventHandler); 212 break; 213 case SelectionKey.OP_READ : 214 destroyReaderThread(eventHandler); 215 break; 216 default: 217 if (orb.transportDebugFlag) { 218 dprint(".unregisterForEvent: default: " + eventHandler); 219 } 220 throw new RuntimeException( 221 "SelectorImpl.uregisterForEvent: unknown interest ops"); 222 } 223 } 224 225 public void close() 226 { 227 if (orb.transportDebugFlag) { 228 dprint(".close"); 229 } 230 231 if (isClosed()) { 232 if (orb.transportDebugFlag) { 233 dprint(".close: already closed"); 234 } 235 return; 236 } 237 238 setClosed(true); 239 240 Iterator i; 241 242 // Kill listeners. 243 244 i = listenerThreads.values().iterator(); 245 while (i.hasNext()) { 246 ListenerThread listenerThread = (ListenerThread) i.next(); 247 listenerThread.close(); 248 } 249 250 // Kill readers. 251 252 i = readerThreads.values().iterator(); 253 while (i.hasNext()) { 254 ReaderThread readerThread = (ReaderThread) i.next(); 255 readerThread.close(); 256 } 257 258 clearDeferredRegistrations(); 259 260 // Selector 261 262 try { 263 if (selector != null) { 264 // wakeup Selector thread to process close request 265 selector.wakeup(); 266 } 267 } catch (Throwable t) { 268 if (orb.transportDebugFlag) { 269 dprint(".close: selector.wakeup: ", t); 270 } 271 } 272 } 273 274 /////////////////////////////////////////////////// 275 // 276 // Thread methods. 277 // 278 279 public void run() 280 { 281 while (!closed) { 282 try { 283 int n = 0; 284 if (timeout == 0 && orb.transportDebugFlag) { 285 dprint(".run: Beginning of selection cycle"); 286 } 287 handleDeferredRegistrations(); 288 enableInterestOps(); 289 try { 290 n = selector.select(timeout); 291 } catch (IOException e) { 292 if (orb.transportDebugFlag) { 293 dprint(".run: selector.select: ", e); 294 } 295 } catch (ClosedSelectorException csEx) { 296 if (orb.transportDebugFlag) { 297 dprint(".run: selector.select: ", csEx); 298 } 299 break; 300 } 301 if (closed) { 302 break; 303 } 304 /* 305 if (timeout == 0 && orb.transportDebugFlag) { 306 dprint(".run: selector.select() returned: " + n); 307 } 308 if (n == 0) { 309 continue; 310 } 311 */ 312 Iterator iterator = selector.selectedKeys().iterator(); 313 if (orb.transportDebugFlag) { 314 if (iterator.hasNext()) { 315 dprint(".run: n = " + n); 316 } 317 } 318 while (iterator.hasNext()) { 319 SelectionKey selectionKey = (SelectionKey) iterator.next(); 320 iterator.remove(); 321 EventHandler eventHandler = (EventHandler) 322 selectionKey.attachment(); 323 try { 324 eventHandler.handleEvent(); 325 } catch (Throwable t) { 326 if (orb.transportDebugFlag) { 327 dprint(".run: eventHandler.handleEvent", t); 328 } 329 } 330 } 331 if (timeout == 0 && orb.transportDebugFlag) { 332 dprint(".run: End of selection cycle"); 333 } 334 } catch (Throwable t) { 335 // IMPORTANT: ignore all errors so the select thread keeps running. 336 // Otherwise a guaranteed hang. 337 if (orb.transportDebugFlag) { 338 dprint(".run: ignoring", t); 339 } 340 } 341 } 342 try { 343 if (selector != null) { 344 if (orb.transportDebugFlag) { 345 dprint(".run: selector.close "); 346 } 347 selector.close(); 348 } 349 } catch (Throwable t) { 350 if (orb.transportDebugFlag) { 351 dprint(".run: selector.close: ", t); 352 } 353 } 354 } 355 356 ///////////////////////////////////////////////////// 357 // 358 // Implementation. 359 // 360 361 private void clearDeferredRegistrations() { 362 synchronized (deferredRegistrations) { 363 int deferredListSize = deferredRegistrations.size(); 364 if (orb.transportDebugFlag) { 365 dprint(".clearDeferredRegistrations:deferred list size == " + deferredListSize); 366 } 367 for (int i = 0; i < deferredListSize; i++) { 368 EventHandler eventHandler = 369 (EventHandler)deferredRegistrations.get(i); 370 if (orb.transportDebugFlag) { 371 dprint(".clearDeferredRegistrations: " + eventHandler); 372 } 373 SelectableChannel channel = eventHandler.getChannel(); 374 SelectionKey selectionKey = null; 375 376 try { 377 if (orb.transportDebugFlag) { 378 dprint(".clearDeferredRegistrations:close channel == " 379 + channel); 380 dprint(".clearDeferredRegistrations:close channel class == " 381 + channel.getClass().getName()); 382 } 383 channel.close(); 384 selectionKey = eventHandler.getSelectionKey(); 385 if (selectionKey != null) { 386 selectionKey.cancel(); 387 selectionKey.attach(null); 388 } 389 } catch (IOException ioEx) { 390 if (orb.transportDebugFlag) { 391 dprint(".clearDeferredRegistrations: ", ioEx); 392 } 393 } 394 } 395 deferredRegistrations.clear(); 396 } 397 } 398 399 private synchronized boolean isClosed () 400 { 401 return closed; 402 } 403 404 private synchronized void setClosed(boolean closed) 405 { 406 this.closed = closed; 407 } 408 409 private void startSelector() 410 { 411 try { 412 selector = Selector.open(); 413 } catch (IOException e) { 414 if (orb.transportDebugFlag) { 415 dprint(".startSelector: Selector.open: IOException: ", e); 416 } 417 // REVISIT - better handling/reporting 418 RuntimeException rte = 419 new RuntimeException(".startSelector: Selector.open exception"); 420 rte.initCause(e); 421 throw rte; 422 } 423 setDaemon(true); 424 start(); 425 selectorStarted = true; 426 if (orb.transportDebugFlag) { 427 dprint(".startSelector: selector.start completed."); 428 } 429 } 430 431 private void handleDeferredRegistrations() 432 { 433 synchronized (deferredRegistrations) { 434 int deferredListSize = deferredRegistrations.size(); 435 for (int i = 0; i < deferredListSize; i++) { 436 EventHandler eventHandler = 437 (EventHandler)deferredRegistrations.get(i); 438 if (orb.transportDebugFlag) { 439 dprint(".handleDeferredRegistrations: " + eventHandler); 440 } 441 SelectableChannel channel = eventHandler.getChannel(); 442 SelectionKey selectionKey = null; 443 try { 444 selectionKey = 445 channel.register(selector, 446 eventHandler.getInterestOps(), 447 (Object)eventHandler); 448 } catch (ClosedChannelException e) { 449 if (orb.transportDebugFlag) { 450 dprint(".handleDeferredRegistrations: ", e); 451 } 452 } 453 eventHandler.setSelectionKey(selectionKey); 454 } 455 deferredRegistrations.clear(); 456 } 457 } 458 459 private void enableInterestOps() 460 { 461 synchronized (interestOpsList) { 462 int listSize = interestOpsList.size(); 463 if (listSize > 0) { 464 if (orb.transportDebugFlag) { 465 dprint(".enableInterestOps:->"); 466 } 467 SelectionKey selectionKey = null; 468 SelectionKeyAndOp keyAndOp = null; 469 int keyOp, selectionKeyOps = 0; 470 for (int i = 0; i < listSize; i++) { 471 keyAndOp = (SelectionKeyAndOp)interestOpsList.get(i); 472 selectionKey = keyAndOp.selectionKey; 473 474 // Need to check if the SelectionKey is valid because a 475 // connection's SelectionKey could be put on the list to 476 // have its OP enabled and before it's enabled be reclaimed. 477 // Otherwise, the enabling of the OP will throw an exception 478 // here and exit this method an potentially not enable all 479 // registered ops. 480 // 481 // So, we ignore SelectionKeys that are invalid. They will get 482 // cleaned up on the next Selector.select() call. 483 484 if (selectionKey.isValid()) { 485 if (orb.transportDebugFlag) { 486 dprint(".enableInterestOps: " + keyAndOp); 487 } 488 keyOp = keyAndOp.keyOp; 489 selectionKeyOps = selectionKey.interestOps(); 490 selectionKey.interestOps(selectionKeyOps | keyOp); 491 } 492 } 493 interestOpsList.clear(); 494 if (orb.transportDebugFlag) { 495 dprint(".enableInterestOps:<-"); 496 } 497 } 498 } 499 } 500 501 private void createListenerThread(EventHandler eventHandler) 502 { 503 if (orb.transportDebugFlag) { 504 dprint(".createListenerThread: " + eventHandler); 505 } 506 Acceptor acceptor = eventHandler.getAcceptor(); 507 ListenerThread listenerThread = 508 new ListenerThreadImpl(orb, acceptor, this); 509 listenerThreads.put(eventHandler, listenerThread); 510 Throwable throwable = null; 511 try { 512 orb.getThreadPoolManager().getThreadPool(0) 513 .getWorkQueue(0).addWork((Work)listenerThread); 514 } catch (NoSuchThreadPoolException e) { 515 throwable = e; 516 } catch (NoSuchWorkQueueException e) { 517 throwable = e; 518 } 519 if (throwable != null) { 520 RuntimeException rte = new RuntimeException(throwable.toString()); 521 rte.initCause(throwable); 522 throw rte; 523 } 524 } 525 526 private void destroyListenerThread(EventHandler eventHandler) 527 { 528 if (orb.transportDebugFlag) { 529 dprint(".destroyListenerThread: " + eventHandler); 530 } 531 ListenerThread listenerThread = (ListenerThread) 532 listenerThreads.get(eventHandler); 533 if (listenerThread == null) { 534 if (orb.transportDebugFlag) { 535 dprint(".destroyListenerThread: cannot find ListenerThread - ignoring."); 536 } 537 return; 538 } 539 listenerThreads.remove(eventHandler); 540 listenerThread.close(); 541 } 542 543 private void createReaderThread(EventHandler eventHandler) 544 { 545 if (orb.transportDebugFlag) { 546 dprint(".createReaderThread: " + eventHandler); 547 } 548 Connection connection = eventHandler.getConnection(); 549 ReaderThread readerThread = 550 new ReaderThreadImpl(orb, connection, this); 551 readerThreads.put(eventHandler, readerThread); 552 Throwable throwable = null; 553 try { 554 orb.getThreadPoolManager().getThreadPool(0) 555 .getWorkQueue(0).addWork((Work)readerThread); 556 } catch (NoSuchThreadPoolException e) { 557 throwable = e; 558 } catch (NoSuchWorkQueueException e) { 559 throwable = e; 560 } 561 if (throwable != null) { 562 RuntimeException rte = new RuntimeException(throwable.toString()); 563 rte.initCause(throwable); 564 throw rte; 565 } 566 } 567 568 private void destroyReaderThread(EventHandler eventHandler) 569 { 570 if (orb.transportDebugFlag) { 571 dprint(".destroyReaderThread: " + eventHandler); 572 } 573 ReaderThread readerThread = (ReaderThread) 574 readerThreads.get(eventHandler); 575 if (readerThread == null) { 576 if (orb.transportDebugFlag) { 577 dprint(".destroyReaderThread: cannot find ReaderThread - ignoring."); 578 } 579 return; 580 } 581 readerThreads.remove(eventHandler); 582 readerThread.close(); 583 } 584 585 private void dprint(String msg) 586 { 587 ORBUtility.dprint("SelectorImpl", msg); 588 } 589 590 protected void dprint(String msg, Throwable t) 591 { 592 dprint(msg); 593 t.printStackTrace(System.out); 594 } 595 596 // Private class to contain a SelectionKey and a SelectionKey op. 597 // Used only by SelectorImpl to register and enable SelectionKey 598 // Op. 599 // REVISIT - Could do away with this class and use the EventHanlder 600 // directly. 601 private class SelectionKeyAndOp 602 { 603 // A SelectionKey.[OP_READ|OP_WRITE|OP_ACCEPT|OP_CONNECT] 604 public int keyOp; 605 public SelectionKey selectionKey; 606 607 // constructor 608 public SelectionKeyAndOp(SelectionKey selectionKey, int keyOp) { 609 this.selectionKey = selectionKey; 610 this.keyOp = keyOp; 611 } 612 } 613 614// End of file. 615} 616