SocketOrChannelAcceptorImpl.java revision 744:a98f572e2ceb
1/*
2 * Copyright (c) 2001, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package com.sun.corba.se.impl.transport;
27
28import java.io.IOException;
29import java.net.InetSocketAddress;
30import java.net.ServerSocket;
31import java.net.Socket;
32import java.nio.channels.SelectableChannel;
33import java.nio.channels.SelectionKey;
34import java.nio.channels.ServerSocketChannel;
35import java.nio.channels.SocketChannel;
36import java.util.Iterator;
37
38import com.sun.corba.se.pept.broker.Broker;
39import com.sun.corba.se.pept.encoding.InputObject;
40import com.sun.corba.se.pept.encoding.OutputObject;
41import com.sun.corba.se.pept.protocol.MessageMediator;
42import com.sun.corba.se.pept.transport.Acceptor;
43import com.sun.corba.se.pept.transport.Connection;
44import com.sun.corba.se.pept.transport.ContactInfo;
45import com.sun.corba.se.pept.transport.EventHandler;
46import com.sun.corba.se.pept.transport.InboundConnectionCache;
47import com.sun.corba.se.pept.transport.Selector;
48
49import com.sun.corba.se.spi.extension.RequestPartitioningPolicy;
50import com.sun.corba.se.spi.ior.IORTemplate;
51import com.sun.corba.se.spi.ior.TaggedProfileTemplate;
52import com.sun.corba.se.spi.ior.iiop.IIOPAddress ;
53import com.sun.corba.se.spi.ior.iiop.IIOPFactories;
54import com.sun.corba.se.spi.ior.iiop.IIOPProfileTemplate ;
55import com.sun.corba.se.spi.ior.iiop.GIOPVersion ;
56import com.sun.corba.se.spi.ior.iiop.AlternateIIOPAddressComponent;
57import com.sun.corba.se.spi.logging.CORBALogDomains;
58import com.sun.corba.se.spi.orb.ORB;
59import com.sun.corba.se.spi.orbutil.threadpool.Work;
60import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
61import com.sun.corba.se.spi.transport.CorbaAcceptor;
62import com.sun.corba.se.spi.transport.CorbaConnection;
63import com.sun.corba.se.spi.transport.SocketInfo;
64import com.sun.corba.se.spi.transport.SocketOrChannelAcceptor;
65
66import com.sun.corba.se.impl.encoding.CDRInputObject;
67import com.sun.corba.se.impl.encoding.CDROutputObject;
68import com.sun.corba.se.impl.logging.ORBUtilSystemException;
69import com.sun.corba.se.impl.oa.poa.Policies; // REVISIT impl/poa specific
70import com.sun.corba.se.impl.orbutil.ORBConstants;
71import com.sun.corba.se.impl.orbutil.ORBUtility;
72
73// BEGIN Legacy support.
74import com.sun.corba.se.spi.legacy.connection.LegacyServerSocketEndPointInfo;
75// END Legacy support.
76
77/**
78 * @author Harold Carr
79 */
80public class SocketOrChannelAcceptorImpl
81    extends
82        EventHandlerBase
83    implements
84        CorbaAcceptor,
85        SocketOrChannelAcceptor,
86        Work,
87        // BEGIN Legacy
88        SocketInfo,
89        LegacyServerSocketEndPointInfo
90        // END Legacy
91{
92    protected ServerSocketChannel serverSocketChannel;
93    protected ServerSocket serverSocket;
94    protected int port;
95    protected long enqueueTime;
96    protected boolean initialized;
97    protected ORBUtilSystemException wrapper ;
98    protected InboundConnectionCache connectionCache;
99
100    // BEGIN Legacy
101    protected String type = "";
102    protected String name = "";
103    protected String hostname;
104    protected int locatorPort;
105    // END Legacy
106
107    public SocketOrChannelAcceptorImpl(ORB orb)
108    {
109        this.orb = orb;
110        wrapper = ORBUtilSystemException.get( orb,
111            CORBALogDomains.RPC_TRANSPORT ) ;
112
113        setWork(this);
114        initialized = false;
115
116        // BEGIN Legacy support.
117        this.hostname = orb.getORBData().getORBServerHost();
118        this.name = LegacyServerSocketEndPointInfo.NO_NAME;
119        this.locatorPort = -1;
120        // END Legacy support.
121    }
122
123    public SocketOrChannelAcceptorImpl(ORB orb, int port)
124    {
125        this(orb);
126        this.port = port;
127    }
128
129    // BEGIN Legacy support.
130    public SocketOrChannelAcceptorImpl(ORB orb, int port,
131                                       String name, String type)
132    {
133        this(orb, port);
134        this.name = name;
135        this.type = type;
136    }
137    // END Legacy support.
138
139    ////////////////////////////////////////////////////
140    //
141    // pept.transport.Acceptor
142    //
143
144    public boolean initialize()
145    {
146        if (initialized) {
147            return false;
148        }
149        if (orb.transportDebugFlag) {
150            dprint(".initialize: " + this);
151        }
152        InetSocketAddress inetSocketAddress = null;
153        try {
154            if (orb.getORBData().getListenOnAllInterfaces().equals(ORBConstants.LISTEN_ON_ALL_INTERFACES)) {
155                inetSocketAddress = new InetSocketAddress(port);
156            } else {
157                String host = orb.getORBData().getORBServerHost();
158                inetSocketAddress = new InetSocketAddress(host, port);
159            }
160            serverSocket = orb.getORBData().getSocketFactory()
161                .createServerSocket(type, inetSocketAddress);
162            internalInitialize();
163        } catch (Throwable t) {
164            throw wrapper.createListenerFailed( t, Integer.toString(port) ) ;
165        }
166        initialized = true;
167        return true;
168    }
169
170    protected void internalInitialize()
171        throws Exception
172    {
173        // Determine the listening port (for the IOR).
174        // This is important when using emphemeral ports (i.e.,
175        // when the port value to the constructor is 0).
176
177        port = serverSocket.getLocalPort();
178
179        // Register with transport (also sets up monitoring).
180
181        orb.getCorbaTransportManager().getInboundConnectionCache(this);
182
183        // Finish configuation.
184
185        serverSocketChannel = serverSocket.getChannel();
186
187        if (serverSocketChannel != null) {
188            setUseSelectThreadToWait(
189                orb.getORBData().acceptorSocketUseSelectThreadToWait());
190            serverSocketChannel.configureBlocking(
191                ! orb.getORBData().acceptorSocketUseSelectThreadToWait());
192        } else {
193            // Configure to use listener and reader threads.
194            setUseSelectThreadToWait(false);
195        }
196        setUseWorkerThreadForEvent(
197            orb.getORBData().acceptorSocketUseWorkerThreadForEvent());
198
199    }
200
201    public boolean initialized()
202    {
203        return initialized;
204    }
205
206    public String getConnectionCacheType()
207    {
208        return this.getClass().toString();
209    }
210
211    public void setConnectionCache(InboundConnectionCache connectionCache)
212    {
213        this.connectionCache = connectionCache;
214    }
215
216    public InboundConnectionCache getConnectionCache()
217    {
218        return connectionCache;
219    }
220
221    public boolean shouldRegisterAcceptEvent()
222    {
223        return true;
224    }
225
226    public void accept()
227    {
228        try {
229            SocketChannel socketChannel = null;
230            Socket socket = null;
231            if (serverSocketChannel == null) {
232                socket = serverSocket.accept();
233            } else {
234                socketChannel = serverSocketChannel.accept();
235                socket = socketChannel.socket();
236            }
237            orb.getORBData().getSocketFactory()
238                .setAcceptedSocketOptions(this, serverSocket, socket);
239            if (orb.transportDebugFlag) {
240                dprint(".accept: " +
241                       (serverSocketChannel == null
242                        ? serverSocket.toString()
243                        : serverSocketChannel.toString()));
244            }
245
246            CorbaConnection connection =
247                new SocketOrChannelConnectionImpl(orb, this, socket);
248            if (orb.transportDebugFlag) {
249                dprint(".accept: new: " + connection);
250            }
251
252            // NOTE: The connection MUST be put in the cache BEFORE being
253            // registered with the selector.  Otherwise if the bytes
254            // are read on the connection it will attempt a time stamp
255            // but the cache will be null, resulting in NPE.
256
257            // A connection needs to be timestamped before putting to the cache.
258            // Otherwise the newly created connection (with 0 timestamp) could be
259            // incorrectly reclaimed by concurrent reclaim() call OR if there
260            // will be no events on this connection then it could be reclaimed
261            // by upcoming reclaim() call.
262            getConnectionCache().stampTime(connection);
263            getConnectionCache().put(this, connection);
264
265            if (connection.shouldRegisterServerReadEvent()) {
266                Selector selector = orb.getTransportManager().getSelector(0);
267                if (selector != null) {
268                    if (orb.transportDebugFlag) {
269                        dprint(".accept: registerForEvent: " + connection);
270                    }
271                    selector.registerForEvent(connection.getEventHandler());
272                }
273            }
274
275            getConnectionCache().reclaim();
276
277        } catch (IOException e) {
278            if (orb.transportDebugFlag) {
279                dprint(".accept:", e);
280            }
281            Selector selector = orb.getTransportManager().getSelector(0);
282            if (selector != null) {
283                selector.unregisterForEvent(this);
284                // REVISIT - need to close - recreate - then register new one.
285                selector.registerForEvent(this);
286                // NOTE: if register cycling we do not want to shut down ORB
287                // since local beans will still work.  Instead one will see
288                // a growing log file to alert admin of problem.
289            }
290        }
291    }
292
293    public void close ()
294    {
295        try {
296            if (orb.transportDebugFlag) {
297                dprint(".close->:");
298            }
299            Selector selector = orb.getTransportManager().getSelector(0);
300            if (selector != null) {
301                selector.unregisterForEvent(this);
302            }
303            if (serverSocketChannel != null) {
304                serverSocketChannel.close();
305            }
306            if (serverSocket != null) {
307                serverSocket.close();
308            }
309        } catch (IOException e) {
310            if (orb.transportDebugFlag) {
311                dprint(".close:", e);
312            }
313        } finally {
314            if (orb.transportDebugFlag) {
315                dprint(".close<-:");
316            }
317        }
318    }
319
320    public EventHandler getEventHandler()
321    {
322        return this;
323    }
324
325    ////////////////////////////////////////////////////
326    //
327    // CorbaAcceptor
328    //
329
330    public String getObjectAdapterId()
331    {
332        return null;
333    }
334
335    public String getObjectAdapterManagerId()
336    {
337        return null;
338    }
339
340    public void addToIORTemplate(IORTemplate iorTemplate,
341                                 Policies policies,
342                                 String codebase)
343    {
344        Iterator iterator = iorTemplate.iteratorById(
345            org.omg.IOP.TAG_INTERNET_IOP.value);
346
347        String hostname = orb.getORBData().getORBServerHost();
348
349        if (iterator.hasNext()) {
350            // REVISIT - how does this play with legacy ORBD port exchange?
351            IIOPAddress iiopAddress =
352                IIOPFactories.makeIIOPAddress(orb, hostname, port);
353            AlternateIIOPAddressComponent iiopAddressComponent =
354                IIOPFactories.makeAlternateIIOPAddressComponent(iiopAddress);
355
356            while (iterator.hasNext()) {
357                TaggedProfileTemplate taggedProfileTemplate =
358                    (TaggedProfileTemplate) iterator.next();
359                taggedProfileTemplate.add(iiopAddressComponent);
360            }
361        } else {
362            GIOPVersion version = orb.getORBData().getGIOPVersion();
363            int templatePort;
364            if (policies.forceZeroPort()) {
365                templatePort = 0;
366            } else if (policies.isTransient()) {
367                templatePort = port;
368            } else {
369                templatePort = orb.getLegacyServerSocketManager()
370                   .legacyGetPersistentServerPort(SocketInfo.IIOP_CLEAR_TEXT);
371            }
372            IIOPAddress addr =
373                IIOPFactories.makeIIOPAddress(orb, hostname, templatePort);
374            IIOPProfileTemplate iiopProfile =
375                IIOPFactories.makeIIOPProfileTemplate(orb, version, addr);
376            if (version.supportsIORIIOPProfileComponents()) {
377                iiopProfile.add(IIOPFactories.makeCodeSetsComponent(orb));
378                iiopProfile.add(IIOPFactories.makeMaxStreamFormatVersionComponent());
379                RequestPartitioningPolicy rpPolicy = (RequestPartitioningPolicy)
380                    policies.get_effective_policy(
381                                      ORBConstants.REQUEST_PARTITIONING_POLICY);
382                if (rpPolicy != null) {
383                    iiopProfile.add(
384                         IIOPFactories.makeRequestPartitioningComponent(
385                             rpPolicy.getValue()));
386                }
387                if (codebase != null && codebase != "") {
388                    iiopProfile.add(IIOPFactories. makeJavaCodebaseComponent(codebase));
389                }
390                if (orb.getORBData().isJavaSerializationEnabled()) {
391                    iiopProfile.add(
392                           IIOPFactories.makeJavaSerializationComponent());
393                }
394            }
395            iorTemplate.add(iiopProfile);
396        }
397    }
398
399    public String getMonitoringName()
400    {
401        return "AcceptedConnections";
402    }
403
404    ////////////////////////////////////////////////////
405    //
406    // EventHandler methods
407    //
408
409    public SelectableChannel getChannel()
410    {
411        return serverSocketChannel;
412    }
413
414    public int getInterestOps()
415    {
416        return SelectionKey.OP_ACCEPT;
417    }
418
419    public Acceptor getAcceptor()
420    {
421        return this;
422    }
423
424    public Connection getConnection()
425    {
426        throw new RuntimeException("Should not happen.");
427    }
428
429    ////////////////////////////////////////////////////
430    //
431    // Work methods.
432    //
433
434    /* CONFLICT: with legacy below.
435    public String getName()
436    {
437        return this.toString();
438    }
439    */
440
441    public void doWork()
442    {
443        try {
444            if (orb.transportDebugFlag) {
445                dprint(".doWork->: " + this);
446            }
447            if (selectionKey.isAcceptable()) {
448                        accept();
449            } else {
450                if (orb.transportDebugFlag) {
451                    dprint(".doWork: ! selectionKey.isAcceptable: " + this);
452                }
453            }
454        } catch (SecurityException se) {
455            if (orb.transportDebugFlag) {
456                dprint(".doWork: ignoring SecurityException: "
457                       + se
458                       + " " + this);
459            }
460            String permissionStr = ORBUtility.getClassSecurityInfo(getClass());
461            wrapper.securityExceptionInAccept(se, permissionStr);
462        } catch (Exception ex) {
463            if (orb.transportDebugFlag) {
464                dprint(".doWork: ignoring Exception: "
465                       + ex
466                       + " " + this);
467            }
468            wrapper.exceptionInAccept(ex);
469        } catch (Throwable t) {
470            if (orb.transportDebugFlag) {
471                dprint(".doWork: ignoring Throwable: "
472                       + t
473                       + " " + this);
474            }
475        } finally {
476
477            // IMPORTANT: To avoid bug (4953599), we force the
478            // Thread that does the NIO select to also do the
479            // enable/disable of Ops using SelectionKey.interestOps().
480            // Otherwise, the SelectionKey.interestOps() may block
481            // indefinitely.
482            // NOTE: If "acceptorSocketUseWorkerThreadForEvent" is
483            // set to to false in ParserTable.java, then this method,
484            // doWork(), will get executed by the same thread
485            // (SelectorThread) that does the NIO select.
486            // If "acceptorSocketUseWorkerThreadForEvent" is set
487            // to true, a WorkerThread will execute this method,
488            // doWork(). Hence, the registering of the enabling of
489            // the SelectionKey's interestOps is done here instead
490            // of calling SelectionKey.interestOps(<interest op>).
491
492            Selector selector = orb.getTransportManager().getSelector(0);
493            if (selector != null) {
494                selector.registerInterestOps(this);
495            }
496
497            if (orb.transportDebugFlag) {
498                dprint(".doWork<-:" + this);
499            }
500        }
501    }
502
503    public void setEnqueueTime(long timeInMillis)
504    {
505        enqueueTime = timeInMillis;
506    }
507
508    public long getEnqueueTime()
509    {
510        return enqueueTime;
511    }
512
513
514    //
515    // Factory methods.
516    //
517
518    // REVISIT: refactor into common base or delegate.
519    public MessageMediator createMessageMediator(Broker broker,
520                                                 Connection connection)
521    {
522        // REVISIT - no factoring so cheat to avoid code dup right now.
523        // REVISIT **** COUPLING !!!!
524        ContactInfo contactInfo = new SocketOrChannelContactInfoImpl();
525        return contactInfo.createMessageMediator(broker, connection);
526    }
527
528    // REVISIT: refactor into common base or delegate.
529    public MessageMediator finishCreatingMessageMediator(Broker broker,
530                                                         Connection connection,
531                                                         MessageMediator messageMediator)
532    {
533        // REVISIT - no factoring so cheat to avoid code dup right now.
534        // REVISIT **** COUPLING !!!!
535        ContactInfo contactInfo = new SocketOrChannelContactInfoImpl();
536        return contactInfo.finishCreatingMessageMediator(broker,
537                                          connection, messageMediator);
538    }
539
540    public InputObject createInputObject(Broker broker,
541                                         MessageMediator messageMediator)
542    {
543        CorbaMessageMediator corbaMessageMediator = (CorbaMessageMediator)
544            messageMediator;
545        return new CDRInputObject((ORB)broker,
546                                  (CorbaConnection)messageMediator.getConnection(),
547                                  corbaMessageMediator.getDispatchBuffer(),
548                                  corbaMessageMediator.getDispatchHeader());
549    }
550
551    public OutputObject createOutputObject(Broker broker,
552                                           MessageMediator messageMediator)
553    {
554        CorbaMessageMediator corbaMessageMediator = (CorbaMessageMediator)
555            messageMediator;
556        return sun.corba.OutputStreamFactory.newCDROutputObject((ORB) broker,
557                       corbaMessageMediator, corbaMessageMediator.getReplyHeader(),
558                       corbaMessageMediator.getStreamFormatVersion());
559    }
560
561    ////////////////////////////////////////////////////
562    //
563    // SocketOrChannelAcceptor
564    //
565
566    public ServerSocket getServerSocket()
567    {
568        return serverSocket;
569    }
570
571    ////////////////////////////////////////////////////
572    //
573    // Implementation.
574    //
575
576    public String toString()
577    {
578        String sock;
579        if (serverSocketChannel == null) {
580            if (serverSocket == null) {
581                sock = "(not initialized)";
582            } else {
583                sock = serverSocket.toString();
584            }
585        } else {
586            sock = serverSocketChannel.toString();
587        }
588
589        return
590            toStringName() +
591            "["
592            + sock + " "
593            + type + " "
594            + shouldUseSelectThreadToWait() + " "
595            + shouldUseWorkerThreadForEvent()
596            + "]" ;
597    }
598
599    protected String toStringName()
600    {
601        return "SocketOrChannelAcceptorImpl";
602    }
603
604    protected void dprint(String msg)
605    {
606        ORBUtility.dprint(toStringName(), msg);
607    }
608
609    protected void dprint(String msg, Throwable t)
610    {
611        dprint(msg);
612        t.printStackTrace(System.out);
613    }
614
615    // BEGIN Legacy support
616    ////////////////////////////////////////////////////
617    //
618    // LegacyServerSocketEndPointInfo and EndPointInfo
619    //
620
621    public String getType()
622    {
623        return type;
624    }
625
626    public String getHostName()
627    {
628        return hostname;
629    }
630
631    public String getHost()
632    {
633        return hostname;
634    }
635
636    public int getPort()
637    {
638        return port;
639    }
640
641    public int getLocatorPort()
642    {
643        return locatorPort;
644    }
645
646    public void setLocatorPort (int port)
647    {
648        locatorPort = port;
649    }
650
651    public String getName()
652    {
653        // Kluge alert:
654        // Work and Legacy both define getName.
655        // Try to make this behave best for most cases.
656        String result =
657            name.equals(LegacyServerSocketEndPointInfo.NO_NAME) ?
658            this.toString() : name;
659        return result;
660    }
661    // END Legacy support
662}
663
664// End of file.
665