SocketOrChannelAcceptorImpl.java revision 608:7e06bf1dcb09
1/* 2 * Copyright (c) 2001, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26package com.sun.corba.se.impl.transport; 27 28import java.io.IOException; 29import java.net.InetSocketAddress; 30import java.net.ServerSocket; 31import java.net.Socket; 32import java.nio.channels.SelectableChannel; 33import java.nio.channels.SelectionKey; 34import java.nio.channels.ServerSocketChannel; 35import java.nio.channels.SocketChannel; 36import java.util.Iterator; 37 38import com.sun.corba.se.pept.broker.Broker; 39import com.sun.corba.se.pept.encoding.InputObject; 40import com.sun.corba.se.pept.encoding.OutputObject; 41import com.sun.corba.se.pept.protocol.MessageMediator; 42import com.sun.corba.se.pept.transport.Acceptor; 43import com.sun.corba.se.pept.transport.Connection; 44import com.sun.corba.se.pept.transport.ContactInfo; 45import com.sun.corba.se.pept.transport.EventHandler; 46import com.sun.corba.se.pept.transport.InboundConnectionCache; 47import com.sun.corba.se.pept.transport.Selector; 48 49import com.sun.corba.se.spi.extension.RequestPartitioningPolicy; 50import com.sun.corba.se.spi.ior.IORTemplate; 51import com.sun.corba.se.spi.ior.TaggedProfileTemplate; 52import com.sun.corba.se.spi.ior.iiop.IIOPAddress ; 53import com.sun.corba.se.spi.ior.iiop.IIOPFactories; 54import com.sun.corba.se.spi.ior.iiop.IIOPProfileTemplate ; 55import com.sun.corba.se.spi.ior.iiop.GIOPVersion ; 56import com.sun.corba.se.spi.ior.iiop.AlternateIIOPAddressComponent; 57import com.sun.corba.se.spi.logging.CORBALogDomains; 58import com.sun.corba.se.spi.orb.ORB; 59import com.sun.corba.se.spi.orbutil.threadpool.Work; 60import com.sun.corba.se.spi.protocol.CorbaMessageMediator; 61import com.sun.corba.se.spi.transport.CorbaAcceptor; 62import com.sun.corba.se.spi.transport.CorbaConnection; 63import com.sun.corba.se.spi.transport.SocketInfo; 64import com.sun.corba.se.spi.transport.SocketOrChannelAcceptor; 65 66import com.sun.corba.se.impl.encoding.CDRInputObject; 67import com.sun.corba.se.impl.encoding.CDROutputObject; 68import com.sun.corba.se.impl.logging.ORBUtilSystemException; 69import com.sun.corba.se.impl.oa.poa.Policies; // REVISIT impl/poa specific 70import com.sun.corba.se.impl.orbutil.ORBConstants; 71import com.sun.corba.se.impl.orbutil.ORBUtility; 72 73// BEGIN Legacy support. 74import com.sun.corba.se.spi.legacy.connection.LegacyServerSocketEndPointInfo; 75// END Legacy support. 76 77/** 78 * @author Harold Carr 79 */ 80public class SocketOrChannelAcceptorImpl 81 extends 82 EventHandlerBase 83 implements 84 CorbaAcceptor, 85 SocketOrChannelAcceptor, 86 Work, 87 // BEGIN Legacy 88 SocketInfo, 89 LegacyServerSocketEndPointInfo 90 // END Legacy 91{ 92 protected ServerSocketChannel serverSocketChannel; 93 protected ServerSocket serverSocket; 94 protected int port; 95 protected long enqueueTime; 96 protected boolean initialized; 97 protected ORBUtilSystemException wrapper ; 98 protected InboundConnectionCache connectionCache; 99 100 // BEGIN Legacy 101 protected String type = ""; 102 protected String name = ""; 103 protected String hostname; 104 protected int locatorPort; 105 // END Legacy 106 107 public SocketOrChannelAcceptorImpl(ORB orb) 108 { 109 this.orb = orb; 110 wrapper = ORBUtilSystemException.get( orb, 111 CORBALogDomains.RPC_TRANSPORT ) ; 112 113 setWork(this); 114 initialized = false; 115 116 // BEGIN Legacy support. 117 this.hostname = orb.getORBData().getORBServerHost(); 118 this.name = LegacyServerSocketEndPointInfo.NO_NAME; 119 this.locatorPort = -1; 120 // END Legacy support. 121 } 122 123 public SocketOrChannelAcceptorImpl(ORB orb, int port) 124 { 125 this(orb); 126 this.port = port; 127 } 128 129 // BEGIN Legacy support. 130 public SocketOrChannelAcceptorImpl(ORB orb, int port, 131 String name, String type) 132 { 133 this(orb, port); 134 this.name = name; 135 this.type = type; 136 } 137 // END Legacy support. 138 139 //////////////////////////////////////////////////// 140 // 141 // pept.transport.Acceptor 142 // 143 144 public boolean initialize() 145 { 146 if (initialized) { 147 return false; 148 } 149 if (orb.transportDebugFlag) { 150 dprint(".initialize: " + this); 151 } 152 InetSocketAddress inetSocketAddress = null; 153 try { 154 if (orb.getORBData().getListenOnAllInterfaces().equals(ORBConstants.LISTEN_ON_ALL_INTERFACES)) { 155 inetSocketAddress = new InetSocketAddress(port); 156 } else { 157 String host = orb.getORBData().getORBServerHost(); 158 inetSocketAddress = new InetSocketAddress(host, port); 159 } 160 serverSocket = orb.getORBData().getSocketFactory() 161 .createServerSocket(type, inetSocketAddress); 162 internalInitialize(); 163 } catch (Throwable t) { 164 throw wrapper.createListenerFailed( t, Integer.toString(port) ) ; 165 } 166 initialized = true; 167 return true; 168 } 169 170 protected void internalInitialize() 171 throws Exception 172 { 173 // Determine the listening port (for the IOR). 174 // This is important when using emphemeral ports (i.e., 175 // when the port value to the constructor is 0). 176 177 port = serverSocket.getLocalPort(); 178 179 // Register with transport (also sets up monitoring). 180 181 orb.getCorbaTransportManager().getInboundConnectionCache(this); 182 183 // Finish configuation. 184 185 serverSocketChannel = serverSocket.getChannel(); 186 187 if (serverSocketChannel != null) { 188 setUseSelectThreadToWait( 189 orb.getORBData().acceptorSocketUseSelectThreadToWait()); 190 serverSocketChannel.configureBlocking( 191 ! orb.getORBData().acceptorSocketUseSelectThreadToWait()); 192 } else { 193 // Configure to use listener and reader threads. 194 setUseSelectThreadToWait(false); 195 } 196 setUseWorkerThreadForEvent( 197 orb.getORBData().acceptorSocketUseWorkerThreadForEvent()); 198 199 } 200 201 public boolean initialized() 202 { 203 return initialized; 204 } 205 206 public String getConnectionCacheType() 207 { 208 return this.getClass().toString(); 209 } 210 211 public void setConnectionCache(InboundConnectionCache connectionCache) 212 { 213 this.connectionCache = connectionCache; 214 } 215 216 public InboundConnectionCache getConnectionCache() 217 { 218 return connectionCache; 219 } 220 221 public boolean shouldRegisterAcceptEvent() 222 { 223 return true; 224 } 225 226 public void accept() 227 { 228 try { 229 SocketChannel socketChannel = null; 230 Socket socket = null; 231 if (serverSocketChannel == null) { 232 socket = serverSocket.accept(); 233 } else { 234 socketChannel = serverSocketChannel.accept(); 235 socket = socketChannel.socket(); 236 } 237 orb.getORBData().getSocketFactory() 238 .setAcceptedSocketOptions(this, serverSocket, socket); 239 if (orb.transportDebugFlag) { 240 dprint(".accept: " + 241 (serverSocketChannel == null 242 ? serverSocket.toString() 243 : serverSocketChannel.toString())); 244 } 245 246 CorbaConnection connection = 247 new SocketOrChannelConnectionImpl(orb, this, socket); 248 if (orb.transportDebugFlag) { 249 dprint(".accept: new: " + connection); 250 } 251 252 // NOTE: The connection MUST be put in the cache BEFORE being 253 // registered with the selector. Otherwise if the bytes 254 // are read on the connection it will attempt a time stamp 255 // but the cache will be null, resulting in NPE. 256 257 // A connection needs to be timestamped before putting to the cache. 258 // Otherwise the newly created connection (with 0 timestamp) could be 259 // incorrectly reclaimed by concurrent reclaim() call OR if there 260 // will be no events on this connection then it could be reclaimed 261 // by upcoming reclaim() call. 262 getConnectionCache().stampTime(connection); 263 getConnectionCache().put(this, connection); 264 265 if (connection.shouldRegisterServerReadEvent()) { 266 Selector selector = orb.getTransportManager().getSelector(0); 267 selector.registerForEvent(connection.getEventHandler()); 268 } 269 270 getConnectionCache().reclaim(); 271 272 } catch (IOException e) { 273 if (orb.transportDebugFlag) { 274 dprint(".accept:", e); 275 } 276 orb.getTransportManager().getSelector(0).unregisterForEvent(this); 277 // REVISIT - need to close - recreate - then register new one. 278 orb.getTransportManager().getSelector(0).registerForEvent(this); 279 // NOTE: if register cycling we do not want to shut down ORB 280 // since local beans will still work. Instead one will see 281 // a growing log file to alert admin of problem. 282 } 283 } 284 285 public void close () 286 { 287 try { 288 if (orb.transportDebugFlag) { 289 dprint(".close->:"); 290 } 291 Selector selector = orb.getTransportManager().getSelector(0); 292 selector.unregisterForEvent(this); 293 if (serverSocketChannel != null) { 294 serverSocketChannel.close(); 295 } 296 if (serverSocket != null) { 297 serverSocket.close(); 298 } 299 } catch (IOException e) { 300 if (orb.transportDebugFlag) { 301 dprint(".close:", e); 302 } 303 } finally { 304 if (orb.transportDebugFlag) { 305 dprint(".close<-:"); 306 } 307 } 308 } 309 310 public EventHandler getEventHandler() 311 { 312 return this; 313 } 314 315 //////////////////////////////////////////////////// 316 // 317 // CorbaAcceptor 318 // 319 320 public String getObjectAdapterId() 321 { 322 return null; 323 } 324 325 public String getObjectAdapterManagerId() 326 { 327 return null; 328 } 329 330 public void addToIORTemplate(IORTemplate iorTemplate, 331 Policies policies, 332 String codebase) 333 { 334 Iterator iterator = iorTemplate.iteratorById( 335 org.omg.IOP.TAG_INTERNET_IOP.value); 336 337 String hostname = orb.getORBData().getORBServerHost(); 338 339 if (iterator.hasNext()) { 340 // REVISIT - how does this play with legacy ORBD port exchange? 341 IIOPAddress iiopAddress = 342 IIOPFactories.makeIIOPAddress(orb, hostname, port); 343 AlternateIIOPAddressComponent iiopAddressComponent = 344 IIOPFactories.makeAlternateIIOPAddressComponent(iiopAddress); 345 346 while (iterator.hasNext()) { 347 TaggedProfileTemplate taggedProfileTemplate = 348 (TaggedProfileTemplate) iterator.next(); 349 taggedProfileTemplate.add(iiopAddressComponent); 350 } 351 } else { 352 GIOPVersion version = orb.getORBData().getGIOPVersion(); 353 int templatePort; 354 if (policies.forceZeroPort()) { 355 templatePort = 0; 356 } else if (policies.isTransient()) { 357 templatePort = port; 358 } else { 359 templatePort = orb.getLegacyServerSocketManager() 360 .legacyGetPersistentServerPort(SocketInfo.IIOP_CLEAR_TEXT); 361 } 362 IIOPAddress addr = 363 IIOPFactories.makeIIOPAddress(orb, hostname, templatePort); 364 IIOPProfileTemplate iiopProfile = 365 IIOPFactories.makeIIOPProfileTemplate(orb, version, addr); 366 if (version.supportsIORIIOPProfileComponents()) { 367 iiopProfile.add(IIOPFactories.makeCodeSetsComponent(orb)); 368 iiopProfile.add(IIOPFactories.makeMaxStreamFormatVersionComponent()); 369 RequestPartitioningPolicy rpPolicy = (RequestPartitioningPolicy) 370 policies.get_effective_policy( 371 ORBConstants.REQUEST_PARTITIONING_POLICY); 372 if (rpPolicy != null) { 373 iiopProfile.add( 374 IIOPFactories.makeRequestPartitioningComponent( 375 rpPolicy.getValue())); 376 } 377 if (codebase != null && codebase != "") { 378 iiopProfile.add(IIOPFactories. makeJavaCodebaseComponent(codebase)); 379 } 380 if (orb.getORBData().isJavaSerializationEnabled()) { 381 iiopProfile.add( 382 IIOPFactories.makeJavaSerializationComponent()); 383 } 384 } 385 iorTemplate.add(iiopProfile); 386 } 387 } 388 389 public String getMonitoringName() 390 { 391 return "AcceptedConnections"; 392 } 393 394 //////////////////////////////////////////////////// 395 // 396 // EventHandler methods 397 // 398 399 public SelectableChannel getChannel() 400 { 401 return serverSocketChannel; 402 } 403 404 public int getInterestOps() 405 { 406 return SelectionKey.OP_ACCEPT; 407 } 408 409 public Acceptor getAcceptor() 410 { 411 return this; 412 } 413 414 public Connection getConnection() 415 { 416 throw new RuntimeException("Should not happen."); 417 } 418 419 //////////////////////////////////////////////////// 420 // 421 // Work methods. 422 // 423 424 /* CONFLICT: with legacy below. 425 public String getName() 426 { 427 return this.toString(); 428 } 429 */ 430 431 public void doWork() 432 { 433 try { 434 if (orb.transportDebugFlag) { 435 dprint(".doWork->: " + this); 436 } 437 if (selectionKey.isAcceptable()) { 438 accept(); 439 } else { 440 if (orb.transportDebugFlag) { 441 dprint(".doWork: ! selectionKey.isAcceptable: " + this); 442 } 443 } 444 } catch (SecurityException se) { 445 if (orb.transportDebugFlag) { 446 dprint(".doWork: ignoring SecurityException: " 447 + se 448 + " " + this); 449 } 450 String permissionStr = ORBUtility.getClassSecurityInfo(getClass()); 451 wrapper.securityExceptionInAccept(se, permissionStr); 452 } catch (Exception ex) { 453 if (orb.transportDebugFlag) { 454 dprint(".doWork: ignoring Exception: " 455 + ex 456 + " " + this); 457 } 458 wrapper.exceptionInAccept(ex); 459 } catch (Throwable t) { 460 if (orb.transportDebugFlag) { 461 dprint(".doWork: ignoring Throwable: " 462 + t 463 + " " + this); 464 } 465 } finally { 466 467 // IMPORTANT: To avoid bug (4953599), we force the 468 // Thread that does the NIO select to also do the 469 // enable/disable of Ops using SelectionKey.interestOps(). 470 // Otherwise, the SelectionKey.interestOps() may block 471 // indefinitely. 472 // NOTE: If "acceptorSocketUseWorkerThreadForEvent" is 473 // set to to false in ParserTable.java, then this method, 474 // doWork(), will get executed by the same thread 475 // (SelectorThread) that does the NIO select. 476 // If "acceptorSocketUseWorkerThreadForEvent" is set 477 // to true, a WorkerThread will execute this method, 478 // doWork(). Hence, the registering of the enabling of 479 // the SelectionKey's interestOps is done here instead 480 // of calling SelectionKey.interestOps(<interest op>). 481 482 Selector selector = orb.getTransportManager().getSelector(0); 483 selector.registerInterestOps(this); 484 485 if (orb.transportDebugFlag) { 486 dprint(".doWork<-:" + this); 487 } 488 } 489 } 490 491 public void setEnqueueTime(long timeInMillis) 492 { 493 enqueueTime = timeInMillis; 494 } 495 496 public long getEnqueueTime() 497 { 498 return enqueueTime; 499 } 500 501 502 // 503 // Factory methods. 504 // 505 506 // REVISIT: refactor into common base or delegate. 507 public MessageMediator createMessageMediator(Broker broker, 508 Connection connection) 509 { 510 // REVISIT - no factoring so cheat to avoid code dup right now. 511 // REVISIT **** COUPLING !!!! 512 ContactInfo contactInfo = new SocketOrChannelContactInfoImpl(); 513 return contactInfo.createMessageMediator(broker, connection); 514 } 515 516 // REVISIT: refactor into common base or delegate. 517 public MessageMediator finishCreatingMessageMediator(Broker broker, 518 Connection connection, 519 MessageMediator messageMediator) 520 { 521 // REVISIT - no factoring so cheat to avoid code dup right now. 522 // REVISIT **** COUPLING !!!! 523 ContactInfo contactInfo = new SocketOrChannelContactInfoImpl(); 524 return contactInfo.finishCreatingMessageMediator(broker, 525 connection, messageMediator); 526 } 527 528 public InputObject createInputObject(Broker broker, 529 MessageMediator messageMediator) 530 { 531 CorbaMessageMediator corbaMessageMediator = (CorbaMessageMediator) 532 messageMediator; 533 return new CDRInputObject((ORB)broker, 534 (CorbaConnection)messageMediator.getConnection(), 535 corbaMessageMediator.getDispatchBuffer(), 536 corbaMessageMediator.getDispatchHeader()); 537 } 538 539 public OutputObject createOutputObject(Broker broker, 540 MessageMediator messageMediator) 541 { 542 CorbaMessageMediator corbaMessageMediator = (CorbaMessageMediator) 543 messageMediator; 544 return sun.corba.OutputStreamFactory.newCDROutputObject((ORB) broker, 545 corbaMessageMediator, corbaMessageMediator.getReplyHeader(), 546 corbaMessageMediator.getStreamFormatVersion()); 547 } 548 549 //////////////////////////////////////////////////// 550 // 551 // SocketOrChannelAcceptor 552 // 553 554 public ServerSocket getServerSocket() 555 { 556 return serverSocket; 557 } 558 559 //////////////////////////////////////////////////// 560 // 561 // Implementation. 562 // 563 564 public String toString() 565 { 566 String sock; 567 if (serverSocketChannel == null) { 568 if (serverSocket == null) { 569 sock = "(not initialized)"; 570 } else { 571 sock = serverSocket.toString(); 572 } 573 } else { 574 sock = serverSocketChannel.toString(); 575 } 576 577 return 578 toStringName() + 579 "[" 580 + sock + " " 581 + type + " " 582 + shouldUseSelectThreadToWait() + " " 583 + shouldUseWorkerThreadForEvent() 584 + "]" ; 585 } 586 587 protected String toStringName() 588 { 589 return "SocketOrChannelAcceptorImpl"; 590 } 591 592 protected void dprint(String msg) 593 { 594 ORBUtility.dprint(toStringName(), msg); 595 } 596 597 protected void dprint(String msg, Throwable t) 598 { 599 dprint(msg); 600 t.printStackTrace(System.out); 601 } 602 603 // BEGIN Legacy support 604 //////////////////////////////////////////////////// 605 // 606 // LegacyServerSocketEndPointInfo and EndPointInfo 607 // 608 609 public String getType() 610 { 611 return type; 612 } 613 614 public String getHostName() 615 { 616 return hostname; 617 } 618 619 public String getHost() 620 { 621 return hostname; 622 } 623 624 public int getPort() 625 { 626 return port; 627 } 628 629 public int getLocatorPort() 630 { 631 return locatorPort; 632 } 633 634 public void setLocatorPort (int port) 635 { 636 locatorPort = port; 637 } 638 639 public String getName() 640 { 641 // Kluge alert: 642 // Work and Legacy both define getName. 643 // Try to make this behave best for most cases. 644 String result = 645 name.equals(LegacyServerSocketEndPointInfo.NO_NAME) ? 646 this.toString() : name; 647 return result; 648 } 649 // END Legacy support 650} 651 652// End of file. 653