SelectorImpl.java revision 667:d0315150c39d
1/* 2 * Copyright (c) 2003, 2015, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26package com.sun.corba.se.impl.transport; 27 28import java.io.IOException; 29import java.nio.channels.ClosedChannelException; 30import java.nio.channels.SelectableChannel; 31import java.nio.channels.SelectionKey; 32import java.nio.channels.Selector; 33import java.util.ArrayList; 34import java.util.HashMap; 35import java.util.Map; 36import java.util.Iterator; 37import java.util.List; 38 39import com.sun.corba.se.pept.broker.Broker; 40import com.sun.corba.se.pept.transport.Acceptor; 41import com.sun.corba.se.pept.transport.Connection; 42import com.sun.corba.se.pept.transport.EventHandler; 43import com.sun.corba.se.pept.transport.ListenerThread; 44import com.sun.corba.se.pept.transport.ReaderThread; 45 46import com.sun.corba.se.spi.logging.CORBALogDomains; 47import com.sun.corba.se.spi.orb.ORB; 48import com.sun.corba.se.spi.orbutil.threadpool.Work; 49import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException; 50import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException; 51 52import com.sun.corba.se.impl.logging.ORBUtilSystemException; 53import com.sun.corba.se.impl.orbutil.ORBUtility; 54 55/** 56 * @author Harold Carr 57 */ 58class SelectorImpl 59 extends 60 ManagedLocalsThread 61 implements 62 com.sun.corba.se.pept.transport.Selector 63{ 64 private ORB orb; 65 private Selector selector; 66 private long timeout; 67 private List deferredRegistrations; 68 private List interestOpsList; 69 private HashMap listenerThreads; 70 private Map readerThreads; 71 private boolean selectorStarted; 72 private volatile boolean closed; 73 private ORBUtilSystemException wrapper; 74 75 76 public SelectorImpl(ORB orb) 77 { 78 this.orb = orb; 79 selector = null; 80 selectorStarted = false; 81 timeout = 60000; 82 deferredRegistrations = new ArrayList(); 83 interestOpsList = new ArrayList(); 84 listenerThreads = new HashMap(); 85 readerThreads = java.util.Collections.synchronizedMap(new HashMap()); 86 closed = false; 87 wrapper = ORBUtilSystemException.get(orb,CORBALogDomains.RPC_TRANSPORT); 88 } 89 90 public void setTimeout(long timeout) 91 { 92 this.timeout = timeout; 93 } 94 95 public long getTimeout() 96 { 97 return timeout; 98 } 99 100 public void registerInterestOps(EventHandler eventHandler) 101 { 102 if (orb.transportDebugFlag) { 103 dprint(".registerInterestOps:-> " + eventHandler); 104 } 105 106 SelectionKey selectionKey = eventHandler.getSelectionKey(); 107 if (selectionKey.isValid()) { 108 int ehOps = eventHandler.getInterestOps(); 109 SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps); 110 synchronized(interestOpsList) { 111 interestOpsList.add(keyAndOp); 112 } 113 // tell Selector Thread there's an update to a SelectorKey's Ops 114 selector.wakeup(); 115 } 116 else { 117 wrapper.selectionKeyInvalid(eventHandler.toString()); 118 if (orb.transportDebugFlag) { 119 dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler); 120 } 121 } 122 123 if (orb.transportDebugFlag) { 124 dprint(".registerInterestOps:<- "); 125 } 126 } 127 128 public void registerForEvent(EventHandler eventHandler) 129 { 130 if (orb.transportDebugFlag) { 131 dprint(".registerForEvent: " + eventHandler); 132 } 133 134 if (isClosed()) { 135 if (orb.transportDebugFlag) { 136 dprint(".registerForEvent: closed: " + eventHandler); 137 } 138 return; 139 } 140 141 if (eventHandler.shouldUseSelectThreadToWait()) { 142 synchronized (deferredRegistrations) { 143 deferredRegistrations.add(eventHandler); 144 } 145 if (! selectorStarted) { 146 startSelector(); 147 } 148 selector.wakeup(); 149 return; 150 } 151 152 switch (eventHandler.getInterestOps()) { 153 case SelectionKey.OP_ACCEPT : 154 createListenerThread(eventHandler); 155 break; 156 case SelectionKey.OP_READ : 157 createReaderThread(eventHandler); 158 break; 159 default: 160 if (orb.transportDebugFlag) { 161 dprint(".registerForEvent: default: " + eventHandler); 162 } 163 throw new RuntimeException( 164 "SelectorImpl.registerForEvent: unknown interest ops"); 165 } 166 } 167 168 public void unregisterForEvent(EventHandler eventHandler) 169 { 170 if (orb.transportDebugFlag) { 171 dprint(".unregisterForEvent: " + eventHandler); 172 } 173 174 if (isClosed()) { 175 if (orb.transportDebugFlag) { 176 dprint(".unregisterForEvent: closed: " + eventHandler); 177 } 178 return; 179 } 180 181 if (eventHandler.shouldUseSelectThreadToWait()) { 182 SelectionKey selectionKey ; 183 synchronized(deferredRegistrations) { 184 selectionKey = eventHandler.getSelectionKey(); 185 } 186 if (selectionKey != null) { 187 selectionKey.cancel(); 188 } 189 selector.wakeup(); 190 return; 191 } 192 193 switch (eventHandler.getInterestOps()) { 194 case SelectionKey.OP_ACCEPT : 195 destroyListenerThread(eventHandler); 196 break; 197 case SelectionKey.OP_READ : 198 destroyReaderThread(eventHandler); 199 break; 200 default: 201 if (orb.transportDebugFlag) { 202 dprint(".unregisterForEvent: default: " + eventHandler); 203 } 204 throw new RuntimeException( 205 "SelectorImpl.uregisterForEvent: unknown interest ops"); 206 } 207 } 208 209 public void close() 210 { 211 if (orb.transportDebugFlag) { 212 dprint(".close"); 213 } 214 215 if (isClosed()) { 216 if (orb.transportDebugFlag) { 217 dprint(".close: already closed"); 218 } 219 return; 220 } 221 222 setClosed(true); 223 224 Iterator i; 225 226 // Kill listeners. 227 228 i = listenerThreads.values().iterator(); 229 while (i.hasNext()) { 230 ListenerThread listenerThread = (ListenerThread) i.next(); 231 listenerThread.close(); 232 } 233 234 // Kill readers. 235 236 i = readerThreads.values().iterator(); 237 while (i.hasNext()) { 238 ReaderThread readerThread = (ReaderThread) i.next(); 239 readerThread.close(); 240 } 241 242 // Selector 243 244 try { 245 if (selector != null) { 246 // wakeup Selector thread to process close request 247 selector.wakeup(); 248 } 249 } catch (Throwable t) { 250 if (orb.transportDebugFlag) { 251 dprint(".close: selector.close: " + t); 252 } 253 } 254 } 255 256 /////////////////////////////////////////////////// 257 // 258 // Thread methods. 259 // 260 261 public void run() 262 { 263 setName("SelectorThread"); 264 while (!closed) { 265 try { 266 int n = 0; 267 if (timeout == 0 && orb.transportDebugFlag) { 268 dprint(".run: Beginning of selection cycle"); 269 } 270 handleDeferredRegistrations(); 271 enableInterestOps(); 272 try { 273 n = selector.select(timeout); 274 } catch (IOException e) { 275 if (orb.transportDebugFlag) { 276 dprint(".run: selector.select: " + e); 277 } 278 } 279 if (closed) { 280 selector.close(); 281 if (orb.transportDebugFlag) { 282 dprint(".run: closed - .run return"); 283 } 284 return; 285 } 286 /* 287 if (timeout == 0 && orb.transportDebugFlag) { 288 dprint(".run: selector.select() returned: " + n); 289 } 290 if (n == 0) { 291 continue; 292 } 293 */ 294 Iterator iterator = selector.selectedKeys().iterator(); 295 if (orb.transportDebugFlag) { 296 if (iterator.hasNext()) { 297 dprint(".run: n = " + n); 298 } 299 } 300 while (iterator.hasNext()) { 301 SelectionKey selectionKey = (SelectionKey) iterator.next(); 302 iterator.remove(); 303 EventHandler eventHandler = (EventHandler) 304 selectionKey.attachment(); 305 try { 306 eventHandler.handleEvent(); 307 } catch (Throwable t) { 308 if (orb.transportDebugFlag) { 309 dprint(".run: eventHandler.handleEvent", t); 310 } 311 } 312 } 313 if (timeout == 0 && orb.transportDebugFlag) { 314 dprint(".run: End of selection cycle"); 315 } 316 } catch (Throwable t) { 317 // IMPORTANT: ignore all errors so the select thread keeps running. 318 // Otherwise a guaranteed hang. 319 if (orb.transportDebugFlag) { 320 dprint(".run: ignoring", t); 321 } 322 } 323 } 324 } 325 326 ///////////////////////////////////////////////////// 327 // 328 // Implementation. 329 // 330 331 private synchronized boolean isClosed () 332 { 333 return closed; 334 } 335 336 private synchronized void setClosed(boolean closed) 337 { 338 this.closed = closed; 339 } 340 341 private void startSelector() 342 { 343 try { 344 selector = Selector.open(); 345 } catch (IOException e) { 346 if (orb.transportDebugFlag) { 347 dprint(".startSelector: Selector.open: IOException: " + e); 348 } 349 // REVISIT - better handling/reporting 350 RuntimeException rte = 351 new RuntimeException(".startSelector: Selector.open exception"); 352 rte.initCause(e); 353 throw rte; 354 } 355 setDaemon(true); 356 start(); 357 selectorStarted = true; 358 if (orb.transportDebugFlag) { 359 dprint(".startSelector: selector.start completed."); 360 } 361 } 362 363 private void handleDeferredRegistrations() 364 { 365 synchronized (deferredRegistrations) { 366 int deferredListSize = deferredRegistrations.size(); 367 for (int i = 0; i < deferredListSize; i++) { 368 EventHandler eventHandler = 369 (EventHandler)deferredRegistrations.get(i); 370 if (orb.transportDebugFlag) { 371 dprint(".handleDeferredRegistrations: " + eventHandler); 372 } 373 SelectableChannel channel = eventHandler.getChannel(); 374 SelectionKey selectionKey = null; 375 try { 376 selectionKey = 377 channel.register(selector, 378 eventHandler.getInterestOps(), 379 (Object)eventHandler); 380 } catch (ClosedChannelException e) { 381 if (orb.transportDebugFlag) { 382 dprint(".handleDeferredRegistrations: " + e); 383 } 384 } 385 eventHandler.setSelectionKey(selectionKey); 386 } 387 deferredRegistrations.clear(); 388 } 389 } 390 391 private void enableInterestOps() 392 { 393 synchronized (interestOpsList) { 394 int listSize = interestOpsList.size(); 395 if (listSize > 0) { 396 if (orb.transportDebugFlag) { 397 dprint(".enableInterestOps:->"); 398 } 399 SelectionKey selectionKey = null; 400 SelectionKeyAndOp keyAndOp = null; 401 int keyOp, selectionKeyOps = 0; 402 for (int i = 0; i < listSize; i++) { 403 keyAndOp = (SelectionKeyAndOp)interestOpsList.get(i); 404 selectionKey = keyAndOp.selectionKey; 405 406 // Need to check if the SelectionKey is valid because a 407 // connection's SelectionKey could be put on the list to 408 // have its OP enabled and before it's enabled be reclaimed. 409 // Otherwise, the enabling of the OP will throw an exception 410 // here and exit this method an potentially not enable all 411 // registered ops. 412 // 413 // So, we ignore SelectionKeys that are invalid. They will get 414 // cleaned up on the next Selector.select() call. 415 416 if (selectionKey.isValid()) { 417 if (orb.transportDebugFlag) { 418 dprint(".enableInterestOps: " + keyAndOp); 419 } 420 keyOp = keyAndOp.keyOp; 421 selectionKeyOps = selectionKey.interestOps(); 422 selectionKey.interestOps(selectionKeyOps | keyOp); 423 } 424 } 425 interestOpsList.clear(); 426 if (orb.transportDebugFlag) { 427 dprint(".enableInterestOps:<-"); 428 } 429 } 430 } 431 } 432 433 private void createListenerThread(EventHandler eventHandler) 434 { 435 if (orb.transportDebugFlag) { 436 dprint(".createListenerThread: " + eventHandler); 437 } 438 Acceptor acceptor = eventHandler.getAcceptor(); 439 ListenerThread listenerThread = 440 new ListenerThreadImpl(orb, acceptor, this); 441 listenerThreads.put(eventHandler, listenerThread); 442 Throwable throwable = null; 443 try { 444 orb.getThreadPoolManager().getThreadPool(0) 445 .getWorkQueue(0).addWork((Work)listenerThread); 446 } catch (NoSuchThreadPoolException e) { 447 throwable = e; 448 } catch (NoSuchWorkQueueException e) { 449 throwable = e; 450 } 451 if (throwable != null) { 452 RuntimeException rte = new RuntimeException(throwable.toString()); 453 rte.initCause(throwable); 454 throw rte; 455 } 456 } 457 458 private void destroyListenerThread(EventHandler eventHandler) 459 { 460 if (orb.transportDebugFlag) { 461 dprint(".destroyListenerThread: " + eventHandler); 462 } 463 ListenerThread listenerThread = (ListenerThread) 464 listenerThreads.get(eventHandler); 465 if (listenerThread == null) { 466 if (orb.transportDebugFlag) { 467 dprint(".destroyListenerThread: cannot find ListenerThread - ignoring."); 468 } 469 return; 470 } 471 listenerThreads.remove(eventHandler); 472 listenerThread.close(); 473 } 474 475 private void createReaderThread(EventHandler eventHandler) 476 { 477 if (orb.transportDebugFlag) { 478 dprint(".createReaderThread: " + eventHandler); 479 } 480 Connection connection = eventHandler.getConnection(); 481 ReaderThread readerThread = 482 new ReaderThreadImpl(orb, connection, this); 483 readerThreads.put(eventHandler, readerThread); 484 Throwable throwable = null; 485 try { 486 orb.getThreadPoolManager().getThreadPool(0) 487 .getWorkQueue(0).addWork((Work)readerThread); 488 } catch (NoSuchThreadPoolException e) { 489 throwable = e; 490 } catch (NoSuchWorkQueueException e) { 491 throwable = e; 492 } 493 if (throwable != null) { 494 RuntimeException rte = new RuntimeException(throwable.toString()); 495 rte.initCause(throwable); 496 throw rte; 497 } 498 } 499 500 private void destroyReaderThread(EventHandler eventHandler) 501 { 502 if (orb.transportDebugFlag) { 503 dprint(".destroyReaderThread: " + eventHandler); 504 } 505 ReaderThread readerThread = (ReaderThread) 506 readerThreads.get(eventHandler); 507 if (readerThread == null) { 508 if (orb.transportDebugFlag) { 509 dprint(".destroyReaderThread: cannot find ReaderThread - ignoring."); 510 } 511 return; 512 } 513 readerThreads.remove(eventHandler); 514 readerThread.close(); 515 } 516 517 private void dprint(String msg) 518 { 519 ORBUtility.dprint("SelectorImpl", msg); 520 } 521 522 protected void dprint(String msg, Throwable t) 523 { 524 dprint(msg); 525 t.printStackTrace(System.out); 526 } 527 528 // Private class to contain a SelectionKey and a SelectionKey op. 529 // Used only by SelectorImpl to register and enable SelectionKey 530 // Op. 531 // REVISIT - Could do away with this class and use the EventHanlder 532 // directly. 533 private class SelectionKeyAndOp 534 { 535 // A SelectionKey.[OP_READ|OP_WRITE|OP_ACCEPT|OP_CONNECT] 536 public int keyOp; 537 public SelectionKey selectionKey; 538 539 // constructor 540 public SelectionKeyAndOp(SelectionKey selectionKey, int keyOp) { 541 this.selectionKey = selectionKey; 542 this.keyOp = keyOp; 543 } 544 } 545 546// End of file. 547} 548