SocketOrChannelConnectionImpl.java revision 608:7e06bf1dcb09
1/*
2 * Copyright (c) 2001, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package com.sun.corba.se.impl.transport;
27
28import java.io.IOException;
29import java.net.InetSocketAddress;
30import java.net.Socket;
31import java.nio.ByteBuffer;
32import java.nio.channels.SelectableChannel;
33import java.nio.channels.SelectionKey;
34import java.nio.channels.SocketChannel;
35import java.security.AccessController;
36import java.security.PrivilegedAction;
37import java.util.Collections;
38import java.util.Hashtable;
39import java.util.HashMap;
40import java.util.Map;
41
42import org.omg.CORBA.COMM_FAILURE;
43import org.omg.CORBA.CompletionStatus;
44import org.omg.CORBA.DATA_CONVERSION;
45import org.omg.CORBA.INTERNAL;
46import org.omg.CORBA.MARSHAL;
47import org.omg.CORBA.OBJECT_NOT_EXIST;
48import org.omg.CORBA.SystemException;
49
50import com.sun.org.omg.SendingContext.CodeBase;
51
52import com.sun.corba.se.pept.broker.Broker;
53import com.sun.corba.se.pept.encoding.InputObject;
54import com.sun.corba.se.pept.encoding.OutputObject;
55import com.sun.corba.se.pept.protocol.MessageMediator;
56import com.sun.corba.se.pept.transport.Acceptor;
57import com.sun.corba.se.pept.transport.Connection;
58import com.sun.corba.se.pept.transport.ConnectionCache;
59import com.sun.corba.se.pept.transport.ContactInfo;
60import com.sun.corba.se.pept.transport.EventHandler;
61import com.sun.corba.se.pept.transport.InboundConnectionCache;
62import com.sun.corba.se.pept.transport.OutboundConnectionCache;
63import com.sun.corba.se.pept.transport.ResponseWaitingRoom;
64import com.sun.corba.se.pept.transport.Selector;
65
66import com.sun.corba.se.spi.ior.IOR;
67import com.sun.corba.se.spi.ior.iiop.GIOPVersion;
68import com.sun.corba.se.spi.logging.CORBALogDomains;
69import com.sun.corba.se.spi.orb.ORB ;
70import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
71import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
72import com.sun.corba.se.spi.orbutil.threadpool.Work;
73import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
74import com.sun.corba.se.spi.transport.CorbaContactInfo;
75import com.sun.corba.se.spi.transport.CorbaConnection;
76import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;
77import com.sun.corba.se.spi.transport.ReadTimeouts;
78
79import com.sun.corba.se.impl.encoding.CachedCodeBase;
80import com.sun.corba.se.impl.encoding.CDRInputStream_1_0;
81import com.sun.corba.se.impl.encoding.CDROutputObject;
82import com.sun.corba.se.impl.encoding.CDROutputStream_1_0;
83import com.sun.corba.se.impl.encoding.CodeSetComponentInfo;
84import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry;
85import com.sun.corba.se.impl.logging.ORBUtilSystemException;
86import com.sun.corba.se.impl.orbutil.ORBConstants;
87import com.sun.corba.se.impl.orbutil.ORBUtility;
88import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
89import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase;
90import com.sun.corba.se.impl.transport.CorbaResponseWaitingRoomImpl;
91
92/**
93 * @author Harold Carr
94 */
95public class SocketOrChannelConnectionImpl
96    extends
97        EventHandlerBase
98    implements
99        CorbaConnection,
100        Work
101{
102    public static boolean dprintWriteLocks = false;
103
104    //
105    // New transport.
106    //
107
108    protected long enqueueTime;
109
110    protected SocketChannel socketChannel;
111    public SocketChannel getSocketChannel()
112    {
113        return socketChannel;
114    }
115
116    // REVISIT:
117    // protected for test: genericRPCMSGFramework.IIOPConnection constructor.
118    protected CorbaContactInfo contactInfo;
119    protected Acceptor acceptor;
120    protected ConnectionCache connectionCache;
121
122    //
123    // From iiop.Connection.java
124    //
125
126    protected Socket socket;    // The socket used for this connection.
127    protected long timeStamp = 0;
128    protected boolean isServer = false;
129
130    // Start at some value other than zero since this is a magic
131    // value in some protocols.
132    protected int requestId = 5;
133    protected CorbaResponseWaitingRoom responseWaitingRoom;
134    protected int state;
135    protected java.lang.Object stateEvent = new java.lang.Object();
136    protected java.lang.Object writeEvent = new java.lang.Object();
137    protected boolean writeLocked;
138    protected int serverRequestCount = 0;
139
140    // Server request map: used on the server side of Connection
141    // Maps request ID to IIOPInputStream.
142    Map serverRequestMap = null;
143
144    // This is a flag associated per connection telling us if the
145    // initial set of sending contexts were sent to the receiver
146    // already...
147    protected boolean postInitialContexts = false;
148
149    // Remote reference to CodeBase server (supplies
150    // FullValueDescription, among other things)
151    protected IOR codeBaseServerIOR;
152
153    // CodeBase cache for this connection.  This will cache remote operations,
154    // handle connecting, and ensure we don't do any remote operations until
155    // necessary.
156    protected CachedCodeBase cachedCodeBase = new CachedCodeBase(this);
157
158    protected ORBUtilSystemException wrapper ;
159
160    // transport read timeout values
161    protected ReadTimeouts readTimeouts;
162
163    protected boolean shouldReadGiopHeaderOnly;
164
165    // A message mediator used when shouldReadGiopHeaderOnly is
166    // true to maintain request message state across execution in a
167    // SelectorThread and WorkerThread.
168    protected CorbaMessageMediator partialMessageMediator = null;
169
170    // Used in genericRPCMSGFramework test.
171    protected SocketOrChannelConnectionImpl(ORB orb)
172    {
173        this.orb = orb;
174        wrapper = ORBUtilSystemException.get( orb,
175            CORBALogDomains.RPC_TRANSPORT ) ;
176
177        setWork(this);
178        responseWaitingRoom = new CorbaResponseWaitingRoomImpl(orb, this);
179        setReadTimeouts(orb.getORBData().getTransportTCPReadTimeouts());
180    }
181
182    // Both client and servers.
183    protected SocketOrChannelConnectionImpl(ORB orb,
184                                            boolean useSelectThreadToWait,
185                                            boolean useWorkerThread)
186    {
187        this(orb) ;
188        setUseSelectThreadToWait(useSelectThreadToWait);
189        setUseWorkerThreadForEvent(useWorkerThread);
190    }
191
192    // Client constructor.
193    public SocketOrChannelConnectionImpl(ORB orb,
194                                         CorbaContactInfo contactInfo,
195                                         boolean useSelectThreadToWait,
196                                         boolean useWorkerThread,
197                                         String socketType,
198                                         String hostname,
199                                         int port)
200    {
201        this(orb, useSelectThreadToWait, useWorkerThread);
202
203        this.contactInfo = contactInfo;
204
205        try {
206            socket = orb.getORBData().getSocketFactory()
207                .createSocket(socketType,
208                              new InetSocketAddress(hostname, port));
209            socketChannel = socket.getChannel();
210
211            if (socketChannel != null) {
212                boolean isBlocking = !useSelectThreadToWait;
213                socketChannel.configureBlocking(isBlocking);
214            } else {
215                // IMPORTANT: non-channel-backed sockets must use
216                // dedicated reader threads.
217                setUseSelectThreadToWait(false);
218            }
219            if (orb.transportDebugFlag) {
220                dprint(".initialize: connection created: " + socket);
221            }
222        } catch (Throwable t) {
223            throw wrapper.connectFailure(t, socketType, hostname,
224                                         Integer.toString(port));
225        }
226        state = OPENING;
227    }
228
229    // Client-side convenience.
230    public SocketOrChannelConnectionImpl(ORB orb,
231                                         CorbaContactInfo contactInfo,
232                                         String socketType,
233                                         String hostname,
234                                         int port)
235    {
236        this(orb, contactInfo,
237             orb.getORBData().connectionSocketUseSelectThreadToWait(),
238             orb.getORBData().connectionSocketUseWorkerThreadForEvent(),
239             socketType, hostname, port);
240    }
241
242    // Server-side constructor.
243    public SocketOrChannelConnectionImpl(ORB orb,
244                                         Acceptor acceptor,
245                                         Socket socket,
246                                         boolean useSelectThreadToWait,
247                                         boolean useWorkerThread)
248    {
249        this(orb, useSelectThreadToWait, useWorkerThread);
250
251        this.socket = socket;
252        socketChannel = socket.getChannel();
253        if (socketChannel != null) {
254            // REVISIT
255            try {
256                boolean isBlocking = !useSelectThreadToWait;
257                socketChannel.configureBlocking(isBlocking);
258            } catch (IOException e) {
259                RuntimeException rte = new RuntimeException();
260                rte.initCause(e);
261                throw rte;
262            }
263        }
264        this.acceptor = acceptor;
265
266        serverRequestMap = Collections.synchronizedMap(new HashMap());
267        isServer = true;
268
269        state = ESTABLISHED;
270    }
271
272    // Server-side convenience
273    public SocketOrChannelConnectionImpl(ORB orb,
274                                         Acceptor acceptor,
275                                         Socket socket)
276    {
277        this(orb, acceptor, socket,
278             (socket.getChannel() == null
279              ? false
280              : orb.getORBData().connectionSocketUseSelectThreadToWait()),
281             (socket.getChannel() == null
282              ? false
283              : orb.getORBData().connectionSocketUseWorkerThreadForEvent()));
284    }
285
286    ////////////////////////////////////////////////////
287    //
288    // framework.transport.Connection
289    //
290
291    public boolean shouldRegisterReadEvent()
292    {
293        return true;
294    }
295
296    public boolean shouldRegisterServerReadEvent()
297    {
298        return true;
299    }
300
301    public boolean read()
302    {
303        try {
304            if (orb.transportDebugFlag) {
305                dprint(".read->: " + this);
306            }
307            CorbaMessageMediator messageMediator = readBits();
308            if (messageMediator != null) {
309                // Null can happen when client closes stream
310                // causing purgecalls.
311                return dispatch(messageMediator);
312            }
313            return true;
314        } finally {
315            if (orb.transportDebugFlag) {
316                dprint(".read<-: " + this);
317            }
318        }
319    }
320
321    protected CorbaMessageMediator readBits()
322    {
323        try {
324
325            if (orb.transportDebugFlag) {
326                dprint(".readBits->: " + this);
327            }
328
329            MessageMediator messageMediator;
330            // REVISIT - use common factory base class.
331            if (contactInfo != null) {
332                messageMediator =
333                    contactInfo.createMessageMediator(orb, this);
334            } else if (acceptor != null) {
335                messageMediator = acceptor.createMessageMediator(orb, this);
336            } else {
337                throw
338                    new RuntimeException("SocketOrChannelConnectionImpl.readBits");
339            }
340            return (CorbaMessageMediator) messageMediator;
341
342        } catch (ThreadDeath td) {
343            if (orb.transportDebugFlag) {
344                dprint(".readBits: " + this + ": ThreadDeath: " + td, td);
345            }
346            try {
347                purgeCalls(wrapper.connectionAbort(td), false, false);
348            } catch (Throwable t) {
349                if (orb.transportDebugFlag) {
350                    dprint(".readBits: " + this + ": purgeCalls: Throwable: " + t, t);
351                }
352            }
353            throw td;
354        } catch (Throwable ex) {
355            if (orb.transportDebugFlag) {
356                dprint(".readBits: " + this + ": Throwable: " + ex, ex);
357            }
358
359            try {
360                if (ex instanceof INTERNAL) {
361                    sendMessageError(GIOPVersion.DEFAULT_VERSION);
362                }
363            } catch (IOException e) {
364                if (orb.transportDebugFlag) {
365                    dprint(".readBits: " + this +
366                           ": sendMessageError: IOException: " + e, e);
367                }
368            }
369            // REVISIT - make sure reader thread is killed.
370            orb.getTransportManager().getSelector(0).unregisterForEvent(this);
371            // Notify anyone waiting.
372            purgeCalls(wrapper.connectionAbort(ex), true, false);
373            // REVISIT
374            //keepRunning = false;
375            // REVISIT - if this is called after purgeCalls then
376            // the state of the socket is ABORT so the writeLock
377            // in close throws an exception.  It is ignored but
378            // causes IBM (screen scraping) tests to fail.
379            //close();
380        } finally {
381            if (orb.transportDebugFlag) {
382                dprint(".readBits<-: " + this);
383            }
384        }
385        return null;
386    }
387
388    protected CorbaMessageMediator finishReadingBits(MessageMediator messageMediator)
389    {
390        try {
391
392            if (orb.transportDebugFlag) {
393                dprint(".finishReadingBits->: " + this);
394            }
395
396            // REVISIT - use common factory base class.
397            if (contactInfo != null) {
398                messageMediator =
399                    contactInfo.finishCreatingMessageMediator(orb, this, messageMediator);
400            } else if (acceptor != null) {
401                messageMediator =
402                    acceptor.finishCreatingMessageMediator(orb, this, messageMediator);
403            } else {
404                throw
405                    new RuntimeException("SocketOrChannelConnectionImpl.finishReadingBits");
406            }
407            return (CorbaMessageMediator) messageMediator;
408
409        } catch (ThreadDeath td) {
410            if (orb.transportDebugFlag) {
411                dprint(".finishReadingBits: " + this + ": ThreadDeath: " + td, td);
412            }
413            try {
414                purgeCalls(wrapper.connectionAbort(td), false, false);
415            } catch (Throwable t) {
416                if (orb.transportDebugFlag) {
417                    dprint(".finishReadingBits: " + this + ": purgeCalls: Throwable: " + t, t);
418                }
419            }
420            throw td;
421        } catch (Throwable ex) {
422            if (orb.transportDebugFlag) {
423                dprint(".finishReadingBits: " + this + ": Throwable: " + ex, ex);
424            }
425
426            try {
427                if (ex instanceof INTERNAL) {
428                    sendMessageError(GIOPVersion.DEFAULT_VERSION);
429                }
430            } catch (IOException e) {
431                if (orb.transportDebugFlag) {
432                    dprint(".finishReadingBits: " + this +
433                           ": sendMessageError: IOException: " + e, e);
434                }
435            }
436            // REVISIT - make sure reader thread is killed.
437            orb.getTransportManager().getSelector(0).unregisterForEvent(this);
438            // Notify anyone waiting.
439            purgeCalls(wrapper.connectionAbort(ex), true, false);
440            // REVISIT
441            //keepRunning = false;
442            // REVISIT - if this is called after purgeCalls then
443            // the state of the socket is ABORT so the writeLock
444            // in close throws an exception.  It is ignored but
445            // causes IBM (screen scraping) tests to fail.
446            //close();
447        } finally {
448            if (orb.transportDebugFlag) {
449                dprint(".finishReadingBits<-: " + this);
450            }
451        }
452        return null;
453    }
454
455    protected boolean dispatch(CorbaMessageMediator messageMediator)
456    {
457        try {
458            if (orb.transportDebugFlag) {
459                dprint(".dispatch->: " + this);
460            }
461
462            //
463            // NOTE:
464            //
465            // This call is the transition from the tranport block
466            // to the protocol block.
467            //
468
469            boolean result =
470                messageMediator.getProtocolHandler()
471                .handleRequest(messageMediator);
472
473            return result;
474
475        } catch (ThreadDeath td) {
476            if (orb.transportDebugFlag) {
477                dprint(".dispatch: ThreadDeath", td );
478            }
479            try {
480                purgeCalls(wrapper.connectionAbort(td), false, false);
481            } catch (Throwable t) {
482                if (orb.transportDebugFlag) {
483                    dprint(".dispatch: purgeCalls: Throwable", t);
484                }
485            }
486            throw td;
487        } catch (Throwable ex) {
488            if (orb.transportDebugFlag) {
489                dprint(".dispatch: Throwable", ex ) ;
490            }
491
492            try {
493                if (ex instanceof INTERNAL) {
494                    sendMessageError(GIOPVersion.DEFAULT_VERSION);
495                }
496            } catch (IOException e) {
497                if (orb.transportDebugFlag) {
498                    dprint(".dispatch: sendMessageError: IOException", e);
499                }
500            }
501            purgeCalls(wrapper.connectionAbort(ex), false, false);
502            // REVISIT
503            //keepRunning = false;
504        } finally {
505            if (orb.transportDebugFlag) {
506                dprint(".dispatch<-: " + this);
507            }
508        }
509
510        return true;
511    }
512
513    public boolean shouldUseDirectByteBuffers()
514    {
515        return getSocketChannel() != null;
516    }
517
518    public ByteBuffer read(int size, int offset, int length, long max_wait_time)
519        throws IOException
520    {
521        if (shouldUseDirectByteBuffers()) {
522
523            ByteBuffer byteBuffer =
524                orb.getByteBufferPool().getByteBuffer(size);
525
526            if (orb.transportDebugFlag) {
527                // print address of ByteBuffer gotten from pool
528                int bbAddress = System.identityHashCode(byteBuffer);
529                StringBuffer sb = new StringBuffer(80);
530                sb.append(".read: got ByteBuffer id (");
531                sb.append(bbAddress).append(") from ByteBufferPool.");
532                String msgStr = sb.toString();
533                dprint(msgStr);
534            }
535
536            byteBuffer.position(offset);
537            byteBuffer.limit(size);
538
539            readFully(byteBuffer, length, max_wait_time);
540
541            return byteBuffer;
542        }
543
544        byte[] buf = new byte[size];
545        readFully(getSocket().getInputStream(), buf,
546                  offset, length, max_wait_time);
547        ByteBuffer byteBuffer = ByteBuffer.wrap(buf);
548        byteBuffer.limit(size);
549        return byteBuffer;
550    }
551
552    public ByteBuffer read(ByteBuffer byteBuffer, int offset,
553                           int length, long max_wait_time)
554        throws IOException
555    {
556        int size = offset + length;
557        if (shouldUseDirectByteBuffers()) {
558
559            if (! byteBuffer.isDirect()) {
560                throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
561            }
562            if (size > byteBuffer.capacity()) {
563                if (orb.transportDebugFlag) {
564                    // print address of ByteBuffer being released
565                    int bbAddress = System.identityHashCode(byteBuffer);
566                    StringBuffer bbsb = new StringBuffer(80);
567                    bbsb.append(".read: releasing ByteBuffer id (")
568                        .append(bbAddress).append(") to ByteBufferPool.");
569                    String bbmsg = bbsb.toString();
570                    dprint(bbmsg);
571                }
572                orb.getByteBufferPool().releaseByteBuffer(byteBuffer);
573                byteBuffer = orb.getByteBufferPool().getByteBuffer(size);
574            }
575            byteBuffer.position(offset);
576            byteBuffer.limit(size);
577            readFully(byteBuffer, length, max_wait_time);
578            byteBuffer.position(0);
579            byteBuffer.limit(size);
580            return byteBuffer;
581        }
582        if (byteBuffer.isDirect()) {
583            throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
584        }
585        byte[] buf = new byte[size];
586        readFully(getSocket().getInputStream(), buf,
587                  offset, length, max_wait_time);
588        return ByteBuffer.wrap(buf);
589    }
590
591    public void readFully(ByteBuffer byteBuffer, int size, long max_wait_time)
592        throws IOException
593    {
594        int n = 0;
595        int bytecount = 0;
596        long time_to_wait = readTimeouts.get_initial_time_to_wait();
597        long total_time_in_wait = 0;
598
599        // The reading of data incorporates a strategy to detect a
600        // rogue client. The strategy is implemented as follows. As
601        // long as data is being read, at least 1 byte or more, we
602        // assume we have a well behaved client. If no data is read,
603        // then we sleep for a time to wait, re-calculate a new time to
604        // wait which is lengthier than the previous time spent waiting.
605        // Then, if the total time spent waiting does not exceed a
606        // maximum time we are willing to wait, we attempt another
607        // read. If the maximum amount of time we are willing to
608        // spend waiting for more data is exceeded, we throw an
609        // IOException.
610
611        // NOTE: Reading of GIOP headers are treated with a smaller
612        //       maximum time to wait threshold. Based on extensive
613        //       performance testing, all GIOP headers are being
614        //       read in 1 read access.
615
616        do {
617            bytecount = getSocketChannel().read(byteBuffer);
618
619            if (bytecount < 0) {
620                throw new IOException("End-of-stream");
621            }
622            else if (bytecount == 0) {
623                try {
624                    Thread.sleep(time_to_wait);
625                    total_time_in_wait += time_to_wait;
626                    time_to_wait =
627                        (long)(time_to_wait*readTimeouts.get_backoff_factor());
628                }
629                catch (InterruptedException ie) {
630                    // ignore exception
631                    if (orb.transportDebugFlag) {
632                        dprint("readFully(): unexpected exception "
633                                + ie.toString());
634                    }
635                }
636            }
637            else {
638                n += bytecount;
639            }
640        }
641        while (n < size && total_time_in_wait < max_wait_time);
642
643        if (n < size && total_time_in_wait >= max_wait_time)
644        {
645            // failed to read entire message
646            throw wrapper.transportReadTimeoutExceeded(new Integer(size),
647                                      new Integer(n), new Long(max_wait_time),
648                                      new Long(total_time_in_wait));
649        }
650
651        getConnectionCache().stampTime(this);
652    }
653
654    // To support non-channel connections.
655    public void readFully(java.io.InputStream is, byte[] buf,
656                          int offset, int size, long max_wait_time)
657        throws IOException
658    {
659        int n = 0;
660        int bytecount = 0;
661        long time_to_wait = readTimeouts.get_initial_time_to_wait();
662        long total_time_in_wait = 0;
663
664        // The reading of data incorporates a strategy to detect a
665        // rogue client. The strategy is implemented as follows. As
666        // long as data is being read, at least 1 byte or more, we
667        // assume we have a well behaved client. If no data is read,
668        // then we sleep for a time to wait, re-calculate a new time to
669        // wait which is lengthier than the previous time spent waiting.
670        // Then, if the total time spent waiting does not exceed a
671        // maximum time we are willing to wait, we attempt another
672        // read. If the maximum amount of time we are willing to
673        // spend waiting for more data is exceeded, we throw an
674        // IOException.
675
676        // NOTE: Reading of GIOP headers are treated with a smaller
677        //       maximum time to wait threshold. Based on extensive
678        //       performance testing, all GIOP headers are being
679        //       read in 1 read access.
680
681        do {
682            bytecount = is.read(buf, offset + n, size - n);
683            if (bytecount < 0) {
684                throw new IOException("End-of-stream");
685            }
686            else if (bytecount == 0) {
687                try {
688                    Thread.sleep(time_to_wait);
689                    total_time_in_wait += time_to_wait;
690                    time_to_wait =
691                        (long)(time_to_wait*readTimeouts.get_backoff_factor());
692                }
693                catch (InterruptedException ie) {
694                    // ignore exception
695                    if (orb.transportDebugFlag) {
696                        dprint("readFully(): unexpected exception "
697                                + ie.toString());
698                    }
699                }
700            }
701            else {
702                n += bytecount;
703            }
704        }
705        while (n < size && total_time_in_wait < max_wait_time);
706
707        if (n < size && total_time_in_wait >= max_wait_time)
708        {
709            // failed to read entire message
710            throw wrapper.transportReadTimeoutExceeded(new Integer(size),
711                                      new Integer(n), new Long(max_wait_time),
712                                      new Long(total_time_in_wait));
713        }
714
715        getConnectionCache().stampTime(this);
716    }
717
718    public void write(ByteBuffer byteBuffer)
719        throws IOException
720    {
721        if (shouldUseDirectByteBuffers()) {
722            /* NOTE: cannot perform this test.  If one ask for a
723               ByteBuffer from the pool which is bigger than the size
724               of ByteBuffers managed by the pool, then the pool will
725               return a HeapByteBuffer.
726            if (byteBuffer.hasArray()) {
727                throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
728            }
729            */
730            // IMPORTANT: For non-blocking SocketChannels, there's no guarantee
731            //            all bytes are written on first write attempt.
732            do {
733                getSocketChannel().write(byteBuffer);
734            }
735            while (byteBuffer.hasRemaining());
736
737        } else {
738            if (! byteBuffer.hasArray()) {
739                throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
740            }
741            byte[] tmpBuf = byteBuffer.array();
742            getSocket().getOutputStream().write(tmpBuf, 0, byteBuffer.limit());
743            getSocket().getOutputStream().flush();
744        }
745
746        // TimeStamp connection to indicate it has been used
747        // Note granularity of connection usage is assumed for
748        // now to be that of a IIOP packet.
749        getConnectionCache().stampTime(this);
750    }
751
752    /**
753     * Note:it is possible for this to be called more than once
754     */
755    public synchronized void close()
756    {
757        try {
758            if (orb.transportDebugFlag) {
759                dprint(".close->: " + this);
760            }
761            writeLock();
762
763            // REVISIT It will be good to have a read lock on the reader thread
764            // before we proceed further, to avoid the reader thread (server side)
765            // from processing requests. This avoids the risk that a new request
766            // will be accepted by ReaderThread while the ListenerThread is
767            // attempting to close this connection.
768
769            if (isBusy()) { // we are busy!
770                writeUnlock();
771                if (orb.transportDebugFlag) {
772                    dprint(".close: isBusy so no close: " + this);
773                }
774                return;
775            }
776
777            try {
778                try {
779                    sendCloseConnection(GIOPVersion.V1_0);
780                } catch (Throwable t) {
781                    wrapper.exceptionWhenSendingCloseConnection(t);
782                }
783
784                synchronized ( stateEvent ){
785                    state = CLOSE_SENT;
786                    stateEvent.notifyAll();
787                }
788
789                // stop the reader without causing it to do purgeCalls
790                //Exception ex = new Exception();
791                //reader.stop(ex); // REVISIT
792
793                // NOTE: !!!!!!
794                // This does writeUnlock().
795                purgeCalls(wrapper.connectionRebind(), false, true);
796
797            } catch (Exception ex) {
798                if (orb.transportDebugFlag) {
799                    dprint(".close: exception: " + this, ex);
800                }
801            }
802            try {
803                Selector selector = orb.getTransportManager().getSelector(0);
804                selector.unregisterForEvent(this);
805                if (socketChannel != null) {
806                    socketChannel.close();
807                }
808                socket.close();
809            } catch (IOException e) {
810                if (orb.transportDebugFlag) {
811                    dprint(".close: " + this, e);
812                }
813            }
814            closeConnectionResources();
815        } finally {
816            if (orb.transportDebugFlag) {
817                dprint(".close<-: " + this);
818            }
819        }
820    }
821
822    public void closeConnectionResources() {
823           if (orb.transportDebugFlag) {
824               dprint(".closeConnectionResources->: " + this);
825           }
826           Selector selector = orb.getTransportManager().getSelector(0);
827           selector.unregisterForEvent(this);
828           try {
829             if (socketChannel != null)
830              socketChannel.close() ;
831                if (socket != null && !socket.isClosed())
832                socket.close() ;
833           } catch (IOException e) {
834             if (orb.transportDebugFlag) {
835                 dprint( ".closeConnectionResources: " + this, e ) ;
836             }
837           }
838           if (orb.transportDebugFlag) {
839               dprint(".closeConnectionResources<-: " + this);
840           }
841     }
842
843
844    public Acceptor getAcceptor()
845    {
846        return acceptor;
847    }
848
849    public ContactInfo getContactInfo()
850    {
851        return contactInfo;
852    }
853
854    public EventHandler getEventHandler()
855    {
856        return this;
857    }
858
859    public OutputObject createOutputObject(MessageMediator messageMediator)
860    {
861        // REVISIT - remove this method from Connection and all it subclasses.
862        throw new RuntimeException("*****SocketOrChannelConnectionImpl.createOutputObject - should not be called.");
863    }
864
865    // This is used by the GIOPOutputObject in order to
866    // throw the correct error when handling code sets.
867    // Can we determine if we are on the server side by
868    // other means?  XREVISIT
869    public boolean isServer()
870    {
871        return isServer;
872    }
873
874    public boolean isBusy()
875    {
876        if (serverRequestCount > 0 ||
877            getResponseWaitingRoom().numberRegistered() > 0)
878        {
879            return true;
880        } else {
881            return false;
882        }
883    }
884
885    public long getTimeStamp()
886    {
887        return timeStamp;
888    }
889
890    public void setTimeStamp(long time)
891    {
892        timeStamp = time;
893    }
894
895    public void setState(String stateString)
896    {
897        synchronized (stateEvent) {
898            if (stateString.equals("ESTABLISHED")) {
899                state =  ESTABLISHED;
900                stateEvent.notifyAll();
901            } else {
902                // REVISIT: ASSERT
903            }
904        }
905    }
906
907    /**
908     * Sets the writeLock for this connection.
909     * If the writeLock is already set by someone else, block till the
910     * writeLock is released and can set by us.
911     * IMPORTANT: this connection's lock must be acquired before
912     * setting the writeLock and must be unlocked after setting the writeLock.
913     */
914    public void writeLock()
915    {
916      try {
917        if (dprintWriteLocks && orb.transportDebugFlag) {
918            dprint(".writeLock->: " + this);
919        }
920        // Keep looping till we can set the writeLock.
921        while ( true ) {
922            int localState = state;
923            switch ( localState ) {
924
925            case OPENING:
926                synchronized (stateEvent) {
927                    if (state != OPENING) {
928                        // somebody has changed 'state' so be careful
929                        break;
930                    }
931                    try {
932                        stateEvent.wait();
933                    } catch (InterruptedException ie) {
934                        if (orb.transportDebugFlag) {
935                            dprint(".writeLock: OPENING InterruptedException: " + this);
936                        }
937                    }
938                }
939                // Loop back
940                break;
941
942            case ESTABLISHED:
943                synchronized (writeEvent) {
944                    if (!writeLocked) {
945                        writeLocked = true;
946                        return;
947                    }
948
949                    try {
950                        // do not stay here too long if state != ESTABLISHED
951                        // Bug 4752117
952                        while (state == ESTABLISHED && writeLocked) {
953                            writeEvent.wait(100);
954                        }
955                    } catch (InterruptedException ie) {
956                        if (orb.transportDebugFlag) {
957                            dprint(".writeLock: ESTABLISHED InterruptedException: " + this);
958                        }
959                    }
960                }
961                // Loop back
962                break;
963
964                //
965                // XXX
966                // Need to distinguish between client and server roles
967                // here probably.
968                //
969            case ABORT:
970                synchronized ( stateEvent ){
971                    if (state != ABORT) {
972                        break;
973                    }
974                    throw wrapper.writeErrorSend() ;
975                }
976
977            case CLOSE_RECVD:
978                // the connection has been closed or closing
979                // ==> throw rebind exception
980                synchronized ( stateEvent ){
981                    if (state != CLOSE_RECVD) {
982                        break;
983                    }
984                    throw wrapper.connectionCloseRebind() ;
985                }
986
987            default:
988                if (orb.transportDebugFlag) {
989                    dprint(".writeLock: default: " + this);
990                }
991                // REVISIT
992                throw new RuntimeException(".writeLock: bad state");
993            }
994        }
995      } finally {
996        if (dprintWriteLocks && orb.transportDebugFlag) {
997            dprint(".writeLock<-: " + this);
998        }
999      }
1000    }
1001
1002    public void writeUnlock()
1003    {
1004        try {
1005            if (dprintWriteLocks && orb.transportDebugFlag) {
1006                dprint(".writeUnlock->: " + this);
1007            }
1008            synchronized (writeEvent) {
1009                writeLocked = false;
1010                writeEvent.notify(); // wake up one guy waiting to write
1011            }
1012        } finally {
1013            if (dprintWriteLocks && orb.transportDebugFlag) {
1014                dprint(".writeUnlock<-: " + this);
1015            }
1016        }
1017    }
1018
1019    // Assumes the caller handles writeLock and writeUnlock
1020    public void sendWithoutLock(OutputObject outputObject)
1021    {
1022        // Don't we need to check for CloseConnection
1023        // here?  REVISIT
1024
1025        // XREVISIT - Shouldn't the MessageMediator
1026        // be the one to handle writing the data here?
1027
1028        try {
1029
1030            // Write the fragment/message
1031
1032            CDROutputObject cdrOutputObject = (CDROutputObject) outputObject;
1033            cdrOutputObject.writeTo(this);
1034            // REVISIT - no flush?
1035            //socket.getOutputStream().flush();
1036
1037        } catch (IOException e1) {
1038
1039            /*
1040             * ADDED(Ram J) 10/13/2000 In the event of an IOException, try
1041             * sending a CancelRequest for regular requests / locate requests
1042             */
1043
1044            // Since IIOPOutputStream's msgheader is set only once, and not
1045            // altered during sending multiple fragments, the original
1046            // msgheader will always have the requestId.
1047            // REVISIT This could be optimized to send a CancelRequest only
1048            // if any fragments had been sent already.
1049
1050            /* REVISIT: MOVE TO SUBCONTRACT
1051            Message msg = os.getMessage();
1052            if (msg.getType() == Message.GIOPRequest ||
1053                    msg.getType() == Message.GIOPLocateRequest) {
1054                GIOPVersion requestVersion = msg.getGIOPVersion();
1055                int requestId = MessageBase.getRequestId(msg);
1056                try {
1057                    sendCancelRequest(requestVersion, requestId);
1058                } catch (IOException e2) {
1059                    // most likely an abortive connection closure.
1060                    // ignore, since nothing more can be done.
1061                    if (orb.transportDebugFlag) {
1062
1063                }
1064            }
1065            */
1066
1067            // REVISIT When a send failure happens, purgeCalls() need to be
1068            // called to ensure that the connection is properly removed from
1069            // further usage (ie., cancelling pending requests with COMM_FAILURE
1070            // with an appropriate minor_code CompletionStatus.MAY_BE).
1071
1072            // Relying on the IIOPOutputStream (as noted below) is not
1073            // sufficient as it handles COMM_FAILURE only for the final
1074            // fragment (during invoke processing). Note that COMM_FAILURE could
1075            // happen while sending the initial fragments.
1076            // Also the IIOPOutputStream does not properly close the connection.
1077            // It simply removes the connection from the table. An orderly
1078            // closure is needed (ie., cancel pending requests on the connection
1079            // COMM_FAILURE as well.
1080
1081            // IIOPOutputStream will cleanup the connection info when it
1082            // sees this exception.
1083            SystemException exc = wrapper.writeErrorSend(e1);
1084            purgeCalls(exc, false, true);
1085            throw exc;
1086        }
1087    }
1088
1089    public void registerWaiter(MessageMediator messageMediator)
1090    {
1091        responseWaitingRoom.registerWaiter(messageMediator);
1092    }
1093
1094    public void unregisterWaiter(MessageMediator messageMediator)
1095    {
1096        responseWaitingRoom.unregisterWaiter(messageMediator);
1097    }
1098
1099    public InputObject waitForResponse(MessageMediator messageMediator)
1100    {
1101        return responseWaitingRoom.waitForResponse(messageMediator);
1102    }
1103
1104    public void setConnectionCache(ConnectionCache connectionCache)
1105    {
1106        this.connectionCache = connectionCache;
1107    }
1108
1109    public ConnectionCache getConnectionCache()
1110    {
1111        return connectionCache;
1112    }
1113
1114    ////////////////////////////////////////////////////
1115    //
1116    // EventHandler methods
1117    //
1118
1119    public void setUseSelectThreadToWait(boolean x)
1120    {
1121        useSelectThreadToWait = x;
1122        // REVISIT - Reading of a GIOP header only is information
1123        //           that should be passed into the constructor
1124        //           from the SocketOrChannelConnection factory.
1125        setReadGiopHeaderOnly(shouldUseSelectThreadToWait());
1126    }
1127
1128    public void handleEvent()
1129    {
1130        if (orb.transportDebugFlag) {
1131            dprint(".handleEvent->: " + this);
1132        }
1133        getSelectionKey().interestOps(getSelectionKey().interestOps() &
1134                                      (~ getInterestOps()));
1135
1136        if (shouldUseWorkerThreadForEvent()) {
1137            Throwable throwable = null;
1138            try {
1139                int poolToUse = 0;
1140                if (shouldReadGiopHeaderOnly()) {
1141                    partialMessageMediator = readBits();
1142                    poolToUse =
1143                        partialMessageMediator.getThreadPoolToUse();
1144                }
1145
1146                if (orb.transportDebugFlag) {
1147                    dprint(".handleEvent: addWork to pool: " + poolToUse);
1148                }
1149                orb.getThreadPoolManager().getThreadPool(poolToUse)
1150                    .getWorkQueue(0).addWork(getWork());
1151            } catch (NoSuchThreadPoolException e) {
1152                throwable = e;
1153            } catch (NoSuchWorkQueueException e) {
1154                throwable = e;
1155            }
1156            // REVISIT: need to close connection.
1157            if (throwable != null) {
1158                if (orb.transportDebugFlag) {
1159                    dprint(".handleEvent: " + throwable);
1160                }
1161                INTERNAL i = new INTERNAL("NoSuchThreadPoolException");
1162                i.initCause(throwable);
1163                throw i;
1164            }
1165        } else {
1166            if (orb.transportDebugFlag) {
1167                dprint(".handleEvent: doWork");
1168            }
1169            getWork().doWork();
1170        }
1171        if (orb.transportDebugFlag) {
1172            dprint(".handleEvent<-: " + this);
1173        }
1174    }
1175
1176    public SelectableChannel getChannel()
1177    {
1178        return socketChannel;
1179    }
1180
1181    public int getInterestOps()
1182    {
1183        return SelectionKey.OP_READ;
1184    }
1185
1186    //    public Acceptor getAcceptor() - already defined above.
1187
1188    public Connection getConnection()
1189    {
1190        return this;
1191    }
1192
1193    ////////////////////////////////////////////////////
1194    //
1195    // Work methods.
1196    //
1197
1198    public String getName()
1199    {
1200        return this.toString();
1201    }
1202
1203    public void doWork()
1204    {
1205        try {
1206            if (orb.transportDebugFlag) {
1207                dprint(".doWork->: " + this);
1208            }
1209
1210            // IMPORTANT: Sanity checks on SelectionKeys such as
1211            //            SelectorKey.isValid() should not be done
1212            //            here.
1213            //
1214
1215            if (!shouldReadGiopHeaderOnly()) {
1216                read();
1217            }
1218            else {
1219                // get the partialMessageMediator
1220                // created by SelectorThread
1221                CorbaMessageMediator messageMediator =
1222                                         this.getPartialMessageMediator();
1223
1224                // read remaining info needed in a MessageMediator
1225                messageMediator = finishReadingBits(messageMediator);
1226
1227                if (messageMediator != null) {
1228                    // Null can happen when client closes stream
1229                    // causing purgecalls.
1230                    dispatch(messageMediator);
1231                }
1232            }
1233        } catch (Throwable t) {
1234            if (orb.transportDebugFlag) {
1235                dprint(".doWork: ignoring Throwable: "
1236                       + t
1237                       + " " + this);
1238            }
1239        } finally {
1240            if (orb.transportDebugFlag) {
1241                dprint(".doWork<-: " + this);
1242            }
1243        }
1244    }
1245
1246    public void setEnqueueTime(long timeInMillis)
1247    {
1248        enqueueTime = timeInMillis;
1249    }
1250
1251    public long getEnqueueTime()
1252    {
1253        return enqueueTime;
1254    }
1255
1256    ////////////////////////////////////////////////////
1257    //
1258    // spi.transport.CorbaConnection.
1259    //
1260
1261    // IMPORTANT: Reader Threads must NOT read Giop header only.
1262    public boolean shouldReadGiopHeaderOnly() {
1263        return shouldReadGiopHeaderOnly;
1264    }
1265
1266    protected void setReadGiopHeaderOnly(boolean shouldReadHeaderOnly) {
1267        shouldReadGiopHeaderOnly = shouldReadHeaderOnly;
1268    }
1269
1270    public ResponseWaitingRoom getResponseWaitingRoom()
1271    {
1272        return responseWaitingRoom;
1273    }
1274
1275    // REVISIT - inteface defines isServer but already defined in
1276    // higher interface.
1277
1278    public void serverRequestMapPut(int requestId,
1279                                    CorbaMessageMediator messageMediator)
1280    {
1281        serverRequestMap.put(new Integer(requestId), messageMediator);
1282    }
1283
1284    public CorbaMessageMediator serverRequestMapGet(int requestId)
1285    {
1286        return (CorbaMessageMediator)
1287            serverRequestMap.get(new Integer(requestId));
1288    }
1289
1290    public void serverRequestMapRemove(int requestId)
1291    {
1292        serverRequestMap.remove(new Integer(requestId));
1293    }
1294
1295
1296    // REVISIT: this is also defined in:
1297    // com.sun.corba.se.spi.legacy.connection.Connection
1298    public java.net.Socket getSocket()
1299    {
1300        return socket;
1301    }
1302
1303    /** It is possible for a Close Connection to have been
1304     ** sent here, but we will not check for this. A "lazy"
1305     ** Exception will be thrown in the Worker thread after the
1306     ** incoming request has been processed even though the connection
1307     ** is closed before the request is processed. This is o.k because
1308     ** it is a boundary condition. To prevent it we would have to add
1309     ** more locks which would reduce performance in the normal case.
1310     **/
1311    public synchronized void serverRequestProcessingBegins()
1312    {
1313        serverRequestCount++;
1314    }
1315
1316    public synchronized void serverRequestProcessingEnds()
1317    {
1318        serverRequestCount--;
1319    }
1320
1321    //
1322    //
1323    //
1324
1325    public synchronized int getNextRequestId()
1326    {
1327        return requestId++;
1328    }
1329
1330    // Negotiated code sets for char and wchar data
1331    protected CodeSetComponentInfo.CodeSetContext codeSetContext = null;
1332
1333    public ORB getBroker()
1334    {
1335        return orb;
1336    }
1337
1338    public CodeSetComponentInfo.CodeSetContext getCodeSetContext() {
1339        // Needs to be synchronized for the following case when the client
1340        // doesn't send the code set context twice, and we have two threads
1341        // in ServerRequestDispatcher processCodeSetContext.
1342        //
1343        // Thread A checks to see if there is a context, there is none, so
1344        //     it calls setCodeSetContext, getting the synch lock.
1345        // Thread B checks to see if there is a context.  If we didn't synch,
1346        //     it might decide to outlaw wchar/wstring.
1347        if (codeSetContext == null) {
1348            synchronized(this) {
1349                return codeSetContext;
1350            }
1351        }
1352
1353        return codeSetContext;
1354    }
1355
1356    public synchronized void setCodeSetContext(CodeSetComponentInfo.CodeSetContext csc) {
1357        // Double check whether or not we need to do this
1358        if (codeSetContext == null) {
1359
1360            if (OSFCodeSetRegistry.lookupEntry(csc.getCharCodeSet()) == null ||
1361                OSFCodeSetRegistry.lookupEntry(csc.getWCharCodeSet()) == null) {
1362                // If the client says it's negotiated a code set that
1363                // isn't a fallback and we never said we support, then
1364                // it has a bug.
1365                throw wrapper.badCodesetsFromClient() ;
1366            }
1367
1368            codeSetContext = csc;
1369        }
1370    }
1371
1372    //
1373    // from iiop.IIOPConnection.java
1374    //
1375
1376    // Map request ID to an InputObject.
1377    // This is so the client thread can start unmarshaling
1378    // the reply and remove it from the out_calls map while the
1379    // ReaderThread can still obtain the input stream to give
1380    // new fragments.  Only the ReaderThread touches the clientReplyMap,
1381    // so it doesn't incur synchronization overhead.
1382
1383    public MessageMediator clientRequestMapGet(int requestId)
1384    {
1385        return responseWaitingRoom.getMessageMediator(requestId);
1386    }
1387
1388    protected MessageMediator clientReply_1_1;
1389
1390    public void clientReply_1_1_Put(MessageMediator x)
1391    {
1392        clientReply_1_1 = x;
1393    }
1394
1395    public MessageMediator clientReply_1_1_Get()
1396    {
1397        return  clientReply_1_1;
1398    }
1399
1400    public void clientReply_1_1_Remove()
1401    {
1402        clientReply_1_1 = null;
1403    }
1404
1405    protected MessageMediator serverRequest_1_1;
1406
1407    public void serverRequest_1_1_Put(MessageMediator x)
1408    {
1409        serverRequest_1_1 = x;
1410    }
1411
1412    public MessageMediator serverRequest_1_1_Get()
1413    {
1414        return  serverRequest_1_1;
1415    }
1416
1417    public void serverRequest_1_1_Remove()
1418    {
1419        serverRequest_1_1 = null;
1420    }
1421
1422    protected String getStateString( int state )
1423    {
1424        synchronized ( stateEvent ){
1425            switch (state) {
1426            case OPENING : return "OPENING" ;
1427            case ESTABLISHED : return "ESTABLISHED" ;
1428            case CLOSE_SENT : return "CLOSE_SENT" ;
1429            case CLOSE_RECVD : return "CLOSE_RECVD" ;
1430            case ABORT : return "ABORT" ;
1431            default : return "???" ;
1432            }
1433        }
1434    }
1435
1436    public synchronized boolean isPostInitialContexts() {
1437        return postInitialContexts;
1438    }
1439
1440    // Can never be unset...
1441    public synchronized void setPostInitialContexts(){
1442        postInitialContexts = true;
1443    }
1444
1445    /**
1446     * Wake up the outstanding requests on the connection, and hand them
1447     * COMM_FAILURE exception with a given minor code.
1448     *
1449     * Also, delete connection from connection table and
1450     * stop the reader thread.
1451
1452     * Note that this should only ever be called by the Reader thread for
1453     * this connection.
1454     *
1455     * @param minor_code The minor code for the COMM_FAILURE major code.
1456     * @param die Kill the reader thread (this thread) before exiting.
1457     */
1458    public void purgeCalls(SystemException systemException,
1459                           boolean die, boolean lockHeld)
1460    {
1461        int minor_code = systemException.minor;
1462
1463        try{
1464            if (orb.transportDebugFlag) {
1465                dprint(".purgeCalls->: "
1466                       + minor_code + "/" + die + "/" + lockHeld
1467                       + " " + this);
1468            }
1469
1470            // If this invocation is a result of ThreadDeath caused
1471            // by a previous execution of this routine, just exit.
1472
1473            synchronized ( stateEvent ){
1474                if ((state == ABORT) || (state == CLOSE_RECVD)) {
1475                    if (orb.transportDebugFlag) {
1476                        dprint(".purgeCalls: exiting since state is: "
1477                               + getStateString(state)
1478                               + " " + this);
1479                    }
1480                    return;
1481                }
1482            }
1483
1484            // Grab the writeLock (freeze the calls)
1485            try {
1486                if (!lockHeld) {
1487                    writeLock();
1488                }
1489            } catch (SystemException ex) {
1490                if (orb.transportDebugFlag)
1491                    dprint(".purgeCalls: SystemException" + ex
1492                           + "; continuing " + this);
1493            }
1494
1495            // Mark the state of the connection
1496            // and determine the request status
1497            org.omg.CORBA.CompletionStatus completion_status;
1498            synchronized ( stateEvent ){
1499                if (minor_code == ORBUtilSystemException.CONNECTION_REBIND) {
1500                    state = CLOSE_RECVD;
1501                    systemException.completed = CompletionStatus.COMPLETED_NO;
1502                } else {
1503                    state = ABORT;
1504                    systemException.completed = CompletionStatus.COMPLETED_MAYBE;
1505                }
1506                stateEvent.notifyAll();
1507            }
1508
1509            try {
1510                socket.getInputStream().close();
1511                socket.getOutputStream().close();
1512                socket.close();
1513            } catch (Exception ex) {
1514                if (orb.transportDebugFlag) {
1515                    dprint(".purgeCalls: Exception closing socket: " + ex
1516                           + " " + this);
1517                }
1518            }
1519
1520            // Signal all threads with outstanding requests on this
1521            // connection and give them the SystemException;
1522
1523            responseWaitingRoom.signalExceptionToAllWaiters(systemException);
1524        } finally {
1525            if (contactInfo != null) {
1526                ((OutboundConnectionCache)getConnectionCache()).remove(contactInfo);
1527            } else if (acceptor != null) {
1528                ((InboundConnectionCache)getConnectionCache()).remove(this);
1529            }
1530
1531            //
1532            // REVISIT: Stop the reader thread
1533            //
1534
1535            // Signal all the waiters of the writeLock.
1536            // There are 4 types of writeLock waiters:
1537            // 1. Send waiters:
1538            // 2. SendReply waiters:
1539            // 3. cleanUp waiters:
1540            // 4. purge_call waiters:
1541            //
1542
1543            writeUnlock();
1544
1545            if (orb.transportDebugFlag) {
1546                dprint(".purgeCalls<-: "
1547                       + minor_code + "/" + die + "/" + lockHeld
1548                       + " " + this);
1549            }
1550        }
1551    }
1552
1553    /*************************************************************************
1554    * The following methods are for dealing with Connection cleaning for
1555    * better scalability of servers in high network load conditions.
1556    **************************************************************************/
1557
1558    public void sendCloseConnection(GIOPVersion giopVersion)
1559        throws IOException
1560    {
1561        Message msg = MessageBase.createCloseConnection(giopVersion);
1562        sendHelper(giopVersion, msg);
1563    }
1564
1565    public void sendMessageError(GIOPVersion giopVersion)
1566        throws IOException
1567    {
1568        Message msg = MessageBase.createMessageError(giopVersion);
1569        sendHelper(giopVersion, msg);
1570    }
1571
1572    /**
1573     * Send a CancelRequest message. This does not lock the connection, so the
1574     * caller needs to ensure this method is called appropriately.
1575     * @exception IOException - could be due to abortive connection closure.
1576     */
1577    public void sendCancelRequest(GIOPVersion giopVersion, int requestId)
1578        throws IOException
1579    {
1580
1581        Message msg = MessageBase.createCancelRequest(giopVersion, requestId);
1582        sendHelper(giopVersion, msg);
1583    }
1584
1585    protected void sendHelper(GIOPVersion giopVersion, Message msg)
1586        throws IOException
1587    {
1588        // REVISIT: See comments in CDROutputObject constructor.
1589        CDROutputObject outputObject =
1590            sun.corba.OutputStreamFactory.newCDROutputObject((ORB)orb, null, giopVersion,
1591                                this, msg, ORBConstants.STREAM_FORMAT_VERSION_1);
1592        msg.write(outputObject);
1593
1594        outputObject.writeTo(this);
1595    }
1596
1597    public void sendCancelRequestWithLock(GIOPVersion giopVersion,
1598                                          int requestId)
1599        throws IOException
1600    {
1601        writeLock();
1602        try {
1603            sendCancelRequest(giopVersion, requestId);
1604        } finally {
1605            writeUnlock();
1606        }
1607    }
1608
1609    // Begin Code Base methods ---------------------------------------
1610    //
1611    // Set this connection's code base IOR.  The IOR comes from the
1612    // SendingContext.  This is an optional service context, but all
1613    // JavaSoft ORBs send it.
1614    //
1615    // The set and get methods don't need to be synchronized since the
1616    // first possible get would occur during reading a valuetype, and
1617    // that would be after the set.
1618
1619    // Sets this connection's code base IOR.  This is done after
1620    // getting the IOR out of the SendingContext service context.
1621    // Our ORBs always send this, but it's optional in CORBA.
1622
1623    public final void setCodeBaseIOR(IOR ior) {
1624        codeBaseServerIOR = ior;
1625    }
1626
1627    public final IOR getCodeBaseIOR() {
1628        return codeBaseServerIOR;
1629    }
1630
1631    // Get a CodeBase stub to use in unmarshaling.  The CachedCodeBase
1632    // won't connect to the remote codebase unless it's necessary.
1633    public final CodeBase getCodeBase() {
1634        return cachedCodeBase;
1635    }
1636
1637    // End Code Base methods -----------------------------------------
1638
1639    // set transport read thresholds
1640    protected void setReadTimeouts(ReadTimeouts readTimeouts) {
1641        this.readTimeouts = readTimeouts;
1642    }
1643
1644    protected void setPartialMessageMediator(CorbaMessageMediator messageMediator) {
1645        partialMessageMediator = messageMediator;
1646    }
1647
1648    protected CorbaMessageMediator getPartialMessageMediator() {
1649        return partialMessageMediator;
1650    }
1651
1652    public String toString()
1653    {
1654        synchronized ( stateEvent ){
1655            return
1656                "SocketOrChannelConnectionImpl[" + " "
1657                + (socketChannel == null ?
1658                   socket.toString() : socketChannel.toString()) + " "
1659                + getStateString( state ) + " "
1660                + shouldUseSelectThreadToWait() + " "
1661                + shouldUseWorkerThreadForEvent() + " "
1662                + shouldReadGiopHeaderOnly()
1663                + "]" ;
1664        }
1665    }
1666
1667    // Must be public - used in encoding.
1668    public void dprint(String msg)
1669    {
1670        ORBUtility.dprint("SocketOrChannelConnectionImpl", msg);
1671    }
1672
1673    protected void dprint(String msg, Throwable t)
1674    {
1675        dprint(msg);
1676        t.printStackTrace(System.out);
1677    }
1678}
1679
1680// End of file.
1681