SocketOrChannelAcceptorImpl.java revision 608:7e06bf1dcb09
1/*
2 * Copyright (c) 2001, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package com.sun.corba.se.impl.transport;
27
28import java.io.IOException;
29import java.net.InetSocketAddress;
30import java.net.ServerSocket;
31import java.net.Socket;
32import java.nio.channels.SelectableChannel;
33import java.nio.channels.SelectionKey;
34import java.nio.channels.ServerSocketChannel;
35import java.nio.channels.SocketChannel;
36import java.util.Iterator;
37
38import com.sun.corba.se.pept.broker.Broker;
39import com.sun.corba.se.pept.encoding.InputObject;
40import com.sun.corba.se.pept.encoding.OutputObject;
41import com.sun.corba.se.pept.protocol.MessageMediator;
42import com.sun.corba.se.pept.transport.Acceptor;
43import com.sun.corba.se.pept.transport.Connection;
44import com.sun.corba.se.pept.transport.ContactInfo;
45import com.sun.corba.se.pept.transport.EventHandler;
46import com.sun.corba.se.pept.transport.InboundConnectionCache;
47import com.sun.corba.se.pept.transport.Selector;
48
49import com.sun.corba.se.spi.extension.RequestPartitioningPolicy;
50import com.sun.corba.se.spi.ior.IORTemplate;
51import com.sun.corba.se.spi.ior.TaggedProfileTemplate;
52import com.sun.corba.se.spi.ior.iiop.IIOPAddress ;
53import com.sun.corba.se.spi.ior.iiop.IIOPFactories;
54import com.sun.corba.se.spi.ior.iiop.IIOPProfileTemplate ;
55import com.sun.corba.se.spi.ior.iiop.GIOPVersion ;
56import com.sun.corba.se.spi.ior.iiop.AlternateIIOPAddressComponent;
57import com.sun.corba.se.spi.logging.CORBALogDomains;
58import com.sun.corba.se.spi.orb.ORB;
59import com.sun.corba.se.spi.orbutil.threadpool.Work;
60import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
61import com.sun.corba.se.spi.transport.CorbaAcceptor;
62import com.sun.corba.se.spi.transport.CorbaConnection;
63import com.sun.corba.se.spi.transport.SocketInfo;
64import com.sun.corba.se.spi.transport.SocketOrChannelAcceptor;
65
66import com.sun.corba.se.impl.encoding.CDRInputObject;
67import com.sun.corba.se.impl.encoding.CDROutputObject;
68import com.sun.corba.se.impl.logging.ORBUtilSystemException;
69import com.sun.corba.se.impl.oa.poa.Policies; // REVISIT impl/poa specific
70import com.sun.corba.se.impl.orbutil.ORBConstants;
71import com.sun.corba.se.impl.orbutil.ORBUtility;
72
73// BEGIN Legacy support.
74import com.sun.corba.se.spi.legacy.connection.LegacyServerSocketEndPointInfo;
75// END Legacy support.
76
77/**
78 * @author Harold Carr
79 */
80public class SocketOrChannelAcceptorImpl
81    extends
82        EventHandlerBase
83    implements
84        CorbaAcceptor,
85        SocketOrChannelAcceptor,
86        Work,
87        // BEGIN Legacy
88        SocketInfo,
89        LegacyServerSocketEndPointInfo
90        // END Legacy
91{
92    protected ServerSocketChannel serverSocketChannel;
93    protected ServerSocket serverSocket;
94    protected int port;
95    protected long enqueueTime;
96    protected boolean initialized;
97    protected ORBUtilSystemException wrapper ;
98    protected InboundConnectionCache connectionCache;
99
100    // BEGIN Legacy
101    protected String type = "";
102    protected String name = "";
103    protected String hostname;
104    protected int locatorPort;
105    // END Legacy
106
107    public SocketOrChannelAcceptorImpl(ORB orb)
108    {
109        this.orb = orb;
110        wrapper = ORBUtilSystemException.get( orb,
111            CORBALogDomains.RPC_TRANSPORT ) ;
112
113        setWork(this);
114        initialized = false;
115
116        // BEGIN Legacy support.
117        this.hostname = orb.getORBData().getORBServerHost();
118        this.name = LegacyServerSocketEndPointInfo.NO_NAME;
119        this.locatorPort = -1;
120        // END Legacy support.
121    }
122
123    public SocketOrChannelAcceptorImpl(ORB orb, int port)
124    {
125        this(orb);
126        this.port = port;
127    }
128
129    // BEGIN Legacy support.
130    public SocketOrChannelAcceptorImpl(ORB orb, int port,
131                                       String name, String type)
132    {
133        this(orb, port);
134        this.name = name;
135        this.type = type;
136    }
137    // END Legacy support.
138
139    ////////////////////////////////////////////////////
140    //
141    // pept.transport.Acceptor
142    //
143
144    public boolean initialize()
145    {
146        if (initialized) {
147            return false;
148        }
149        if (orb.transportDebugFlag) {
150            dprint(".initialize: " + this);
151        }
152        InetSocketAddress inetSocketAddress = null;
153        try {
154            if (orb.getORBData().getListenOnAllInterfaces().equals(ORBConstants.LISTEN_ON_ALL_INTERFACES)) {
155                inetSocketAddress = new InetSocketAddress(port);
156            } else {
157                String host = orb.getORBData().getORBServerHost();
158                inetSocketAddress = new InetSocketAddress(host, port);
159            }
160            serverSocket = orb.getORBData().getSocketFactory()
161                .createServerSocket(type, inetSocketAddress);
162            internalInitialize();
163        } catch (Throwable t) {
164            throw wrapper.createListenerFailed( t, Integer.toString(port) ) ;
165        }
166        initialized = true;
167        return true;
168    }
169
170    protected void internalInitialize()
171        throws Exception
172    {
173        // Determine the listening port (for the IOR).
174        // This is important when using emphemeral ports (i.e.,
175        // when the port value to the constructor is 0).
176
177        port = serverSocket.getLocalPort();
178
179        // Register with transport (also sets up monitoring).
180
181        orb.getCorbaTransportManager().getInboundConnectionCache(this);
182
183        // Finish configuation.
184
185        serverSocketChannel = serverSocket.getChannel();
186
187        if (serverSocketChannel != null) {
188            setUseSelectThreadToWait(
189                orb.getORBData().acceptorSocketUseSelectThreadToWait());
190            serverSocketChannel.configureBlocking(
191                ! orb.getORBData().acceptorSocketUseSelectThreadToWait());
192        } else {
193            // Configure to use listener and reader threads.
194            setUseSelectThreadToWait(false);
195        }
196        setUseWorkerThreadForEvent(
197            orb.getORBData().acceptorSocketUseWorkerThreadForEvent());
198
199    }
200
201    public boolean initialized()
202    {
203        return initialized;
204    }
205
206    public String getConnectionCacheType()
207    {
208        return this.getClass().toString();
209    }
210
211    public void setConnectionCache(InboundConnectionCache connectionCache)
212    {
213        this.connectionCache = connectionCache;
214    }
215
216    public InboundConnectionCache getConnectionCache()
217    {
218        return connectionCache;
219    }
220
221    public boolean shouldRegisterAcceptEvent()
222    {
223        return true;
224    }
225
226    public void accept()
227    {
228        try {
229            SocketChannel socketChannel = null;
230            Socket socket = null;
231            if (serverSocketChannel == null) {
232                socket = serverSocket.accept();
233            } else {
234                socketChannel = serverSocketChannel.accept();
235                socket = socketChannel.socket();
236            }
237            orb.getORBData().getSocketFactory()
238                .setAcceptedSocketOptions(this, serverSocket, socket);
239            if (orb.transportDebugFlag) {
240                dprint(".accept: " +
241                       (serverSocketChannel == null
242                        ? serverSocket.toString()
243                        : serverSocketChannel.toString()));
244            }
245
246            CorbaConnection connection =
247                new SocketOrChannelConnectionImpl(orb, this, socket);
248            if (orb.transportDebugFlag) {
249                dprint(".accept: new: " + connection);
250            }
251
252            // NOTE: The connection MUST be put in the cache BEFORE being
253            // registered with the selector.  Otherwise if the bytes
254            // are read on the connection it will attempt a time stamp
255            // but the cache will be null, resulting in NPE.
256
257            // A connection needs to be timestamped before putting to the cache.
258            // Otherwise the newly created connection (with 0 timestamp) could be
259            // incorrectly reclaimed by concurrent reclaim() call OR if there
260            // will be no events on this connection then it could be reclaimed
261            // by upcoming reclaim() call.
262            getConnectionCache().stampTime(connection);
263            getConnectionCache().put(this, connection);
264
265            if (connection.shouldRegisterServerReadEvent()) {
266                Selector selector = orb.getTransportManager().getSelector(0);
267                selector.registerForEvent(connection.getEventHandler());
268            }
269
270            getConnectionCache().reclaim();
271
272        } catch (IOException e) {
273            if (orb.transportDebugFlag) {
274                dprint(".accept:", e);
275            }
276            orb.getTransportManager().getSelector(0).unregisterForEvent(this);
277            // REVISIT - need to close - recreate - then register new one.
278            orb.getTransportManager().getSelector(0).registerForEvent(this);
279            // NOTE: if register cycling we do not want to shut down ORB
280            // since local beans will still work.  Instead one will see
281            // a growing log file to alert admin of problem.
282        }
283    }
284
285    public void close ()
286    {
287        try {
288            if (orb.transportDebugFlag) {
289                dprint(".close->:");
290            }
291            Selector selector = orb.getTransportManager().getSelector(0);
292            selector.unregisterForEvent(this);
293            if (serverSocketChannel != null) {
294                serverSocketChannel.close();
295            }
296            if (serverSocket != null) {
297                serverSocket.close();
298            }
299        } catch (IOException e) {
300            if (orb.transportDebugFlag) {
301                dprint(".close:", e);
302            }
303        } finally {
304            if (orb.transportDebugFlag) {
305                dprint(".close<-:");
306            }
307        }
308    }
309
310    public EventHandler getEventHandler()
311    {
312        return this;
313    }
314
315    ////////////////////////////////////////////////////
316    //
317    // CorbaAcceptor
318    //
319
320    public String getObjectAdapterId()
321    {
322        return null;
323    }
324
325    public String getObjectAdapterManagerId()
326    {
327        return null;
328    }
329
330    public void addToIORTemplate(IORTemplate iorTemplate,
331                                 Policies policies,
332                                 String codebase)
333    {
334        Iterator iterator = iorTemplate.iteratorById(
335            org.omg.IOP.TAG_INTERNET_IOP.value);
336
337        String hostname = orb.getORBData().getORBServerHost();
338
339        if (iterator.hasNext()) {
340            // REVISIT - how does this play with legacy ORBD port exchange?
341            IIOPAddress iiopAddress =
342                IIOPFactories.makeIIOPAddress(orb, hostname, port);
343            AlternateIIOPAddressComponent iiopAddressComponent =
344                IIOPFactories.makeAlternateIIOPAddressComponent(iiopAddress);
345
346            while (iterator.hasNext()) {
347                TaggedProfileTemplate taggedProfileTemplate =
348                    (TaggedProfileTemplate) iterator.next();
349                taggedProfileTemplate.add(iiopAddressComponent);
350            }
351        } else {
352            GIOPVersion version = orb.getORBData().getGIOPVersion();
353            int templatePort;
354            if (policies.forceZeroPort()) {
355                templatePort = 0;
356            } else if (policies.isTransient()) {
357                templatePort = port;
358            } else {
359                templatePort = orb.getLegacyServerSocketManager()
360                   .legacyGetPersistentServerPort(SocketInfo.IIOP_CLEAR_TEXT);
361            }
362            IIOPAddress addr =
363                IIOPFactories.makeIIOPAddress(orb, hostname, templatePort);
364            IIOPProfileTemplate iiopProfile =
365                IIOPFactories.makeIIOPProfileTemplate(orb, version, addr);
366            if (version.supportsIORIIOPProfileComponents()) {
367                iiopProfile.add(IIOPFactories.makeCodeSetsComponent(orb));
368                iiopProfile.add(IIOPFactories.makeMaxStreamFormatVersionComponent());
369                RequestPartitioningPolicy rpPolicy = (RequestPartitioningPolicy)
370                    policies.get_effective_policy(
371                                      ORBConstants.REQUEST_PARTITIONING_POLICY);
372                if (rpPolicy != null) {
373                    iiopProfile.add(
374                         IIOPFactories.makeRequestPartitioningComponent(
375                             rpPolicy.getValue()));
376                }
377                if (codebase != null && codebase != "") {
378                    iiopProfile.add(IIOPFactories. makeJavaCodebaseComponent(codebase));
379                }
380                if (orb.getORBData().isJavaSerializationEnabled()) {
381                    iiopProfile.add(
382                           IIOPFactories.makeJavaSerializationComponent());
383                }
384            }
385            iorTemplate.add(iiopProfile);
386        }
387    }
388
389    public String getMonitoringName()
390    {
391        return "AcceptedConnections";
392    }
393
394    ////////////////////////////////////////////////////
395    //
396    // EventHandler methods
397    //
398
399    public SelectableChannel getChannel()
400    {
401        return serverSocketChannel;
402    }
403
404    public int getInterestOps()
405    {
406        return SelectionKey.OP_ACCEPT;
407    }
408
409    public Acceptor getAcceptor()
410    {
411        return this;
412    }
413
414    public Connection getConnection()
415    {
416        throw new RuntimeException("Should not happen.");
417    }
418
419    ////////////////////////////////////////////////////
420    //
421    // Work methods.
422    //
423
424    /* CONFLICT: with legacy below.
425    public String getName()
426    {
427        return this.toString();
428    }
429    */
430
431    public void doWork()
432    {
433        try {
434            if (orb.transportDebugFlag) {
435                dprint(".doWork->: " + this);
436            }
437            if (selectionKey.isAcceptable()) {
438                        accept();
439            } else {
440                if (orb.transportDebugFlag) {
441                    dprint(".doWork: ! selectionKey.isAcceptable: " + this);
442                }
443            }
444        } catch (SecurityException se) {
445            if (orb.transportDebugFlag) {
446                dprint(".doWork: ignoring SecurityException: "
447                       + se
448                       + " " + this);
449            }
450            String permissionStr = ORBUtility.getClassSecurityInfo(getClass());
451            wrapper.securityExceptionInAccept(se, permissionStr);
452        } catch (Exception ex) {
453            if (orb.transportDebugFlag) {
454                dprint(".doWork: ignoring Exception: "
455                       + ex
456                       + " " + this);
457            }
458            wrapper.exceptionInAccept(ex);
459        } catch (Throwable t) {
460            if (orb.transportDebugFlag) {
461                dprint(".doWork: ignoring Throwable: "
462                       + t
463                       + " " + this);
464            }
465        } finally {
466
467            // IMPORTANT: To avoid bug (4953599), we force the
468            // Thread that does the NIO select to also do the
469            // enable/disable of Ops using SelectionKey.interestOps().
470            // Otherwise, the SelectionKey.interestOps() may block
471            // indefinitely.
472            // NOTE: If "acceptorSocketUseWorkerThreadForEvent" is
473            // set to to false in ParserTable.java, then this method,
474            // doWork(), will get executed by the same thread
475            // (SelectorThread) that does the NIO select.
476            // If "acceptorSocketUseWorkerThreadForEvent" is set
477            // to true, a WorkerThread will execute this method,
478            // doWork(). Hence, the registering of the enabling of
479            // the SelectionKey's interestOps is done here instead
480            // of calling SelectionKey.interestOps(<interest op>).
481
482            Selector selector = orb.getTransportManager().getSelector(0);
483            selector.registerInterestOps(this);
484
485            if (orb.transportDebugFlag) {
486                dprint(".doWork<-:" + this);
487            }
488        }
489    }
490
491    public void setEnqueueTime(long timeInMillis)
492    {
493        enqueueTime = timeInMillis;
494    }
495
496    public long getEnqueueTime()
497    {
498        return enqueueTime;
499    }
500
501
502    //
503    // Factory methods.
504    //
505
506    // REVISIT: refactor into common base or delegate.
507    public MessageMediator createMessageMediator(Broker broker,
508                                                 Connection connection)
509    {
510        // REVISIT - no factoring so cheat to avoid code dup right now.
511        // REVISIT **** COUPLING !!!!
512        ContactInfo contactInfo = new SocketOrChannelContactInfoImpl();
513        return contactInfo.createMessageMediator(broker, connection);
514    }
515
516    // REVISIT: refactor into common base or delegate.
517    public MessageMediator finishCreatingMessageMediator(Broker broker,
518                                                         Connection connection,
519                                                         MessageMediator messageMediator)
520    {
521        // REVISIT - no factoring so cheat to avoid code dup right now.
522        // REVISIT **** COUPLING !!!!
523        ContactInfo contactInfo = new SocketOrChannelContactInfoImpl();
524        return contactInfo.finishCreatingMessageMediator(broker,
525                                          connection, messageMediator);
526    }
527
528    public InputObject createInputObject(Broker broker,
529                                         MessageMediator messageMediator)
530    {
531        CorbaMessageMediator corbaMessageMediator = (CorbaMessageMediator)
532            messageMediator;
533        return new CDRInputObject((ORB)broker,
534                                  (CorbaConnection)messageMediator.getConnection(),
535                                  corbaMessageMediator.getDispatchBuffer(),
536                                  corbaMessageMediator.getDispatchHeader());
537    }
538
539    public OutputObject createOutputObject(Broker broker,
540                                           MessageMediator messageMediator)
541    {
542        CorbaMessageMediator corbaMessageMediator = (CorbaMessageMediator)
543            messageMediator;
544        return sun.corba.OutputStreamFactory.newCDROutputObject((ORB) broker,
545                       corbaMessageMediator, corbaMessageMediator.getReplyHeader(),
546                       corbaMessageMediator.getStreamFormatVersion());
547    }
548
549    ////////////////////////////////////////////////////
550    //
551    // SocketOrChannelAcceptor
552    //
553
554    public ServerSocket getServerSocket()
555    {
556        return serverSocket;
557    }
558
559    ////////////////////////////////////////////////////
560    //
561    // Implementation.
562    //
563
564    public String toString()
565    {
566        String sock;
567        if (serverSocketChannel == null) {
568            if (serverSocket == null) {
569                sock = "(not initialized)";
570            } else {
571                sock = serverSocket.toString();
572            }
573        } else {
574            sock = serverSocketChannel.toString();
575        }
576
577        return
578            toStringName() +
579            "["
580            + sock + " "
581            + type + " "
582            + shouldUseSelectThreadToWait() + " "
583            + shouldUseWorkerThreadForEvent()
584            + "]" ;
585    }
586
587    protected String toStringName()
588    {
589        return "SocketOrChannelAcceptorImpl";
590    }
591
592    protected void dprint(String msg)
593    {
594        ORBUtility.dprint(toStringName(), msg);
595    }
596
597    protected void dprint(String msg, Throwable t)
598    {
599        dprint(msg);
600        t.printStackTrace(System.out);
601    }
602
603    // BEGIN Legacy support
604    ////////////////////////////////////////////////////
605    //
606    // LegacyServerSocketEndPointInfo and EndPointInfo
607    //
608
609    public String getType()
610    {
611        return type;
612    }
613
614    public String getHostName()
615    {
616        return hostname;
617    }
618
619    public String getHost()
620    {
621        return hostname;
622    }
623
624    public int getPort()
625    {
626        return port;
627    }
628
629    public int getLocatorPort()
630    {
631        return locatorPort;
632    }
633
634    public void setLocatorPort (int port)
635    {
636        locatorPort = port;
637    }
638
639    public String getName()
640    {
641        // Kluge alert:
642        // Work and Legacy both define getName.
643        // Try to make this behave best for most cases.
644        String result =
645            name.equals(LegacyServerSocketEndPointInfo.NO_NAME) ?
646            this.toString() : name;
647        return result;
648    }
649    // END Legacy support
650}
651
652// End of file.
653