SocketOrChannelConnectionImpl.java revision 744:a98f572e2ceb
1/*
2 * Copyright (c) 2001, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package com.sun.corba.se.impl.transport;
27
28import java.io.IOException;
29import java.net.InetSocketAddress;
30import java.net.Socket;
31import java.nio.ByteBuffer;
32import java.nio.channels.SelectableChannel;
33import java.nio.channels.SelectionKey;
34import java.nio.channels.SocketChannel;
35import java.security.AccessController;
36import java.security.PrivilegedAction;
37import java.util.Collections;
38import java.util.Hashtable;
39import java.util.HashMap;
40import java.util.Map;
41
42import org.omg.CORBA.COMM_FAILURE;
43import org.omg.CORBA.CompletionStatus;
44import org.omg.CORBA.DATA_CONVERSION;
45import org.omg.CORBA.INTERNAL;
46import org.omg.CORBA.MARSHAL;
47import org.omg.CORBA.OBJECT_NOT_EXIST;
48import org.omg.CORBA.SystemException;
49
50import com.sun.org.omg.SendingContext.CodeBase;
51
52import com.sun.corba.se.pept.broker.Broker;
53import com.sun.corba.se.pept.encoding.InputObject;
54import com.sun.corba.se.pept.encoding.OutputObject;
55import com.sun.corba.se.pept.protocol.MessageMediator;
56import com.sun.corba.se.pept.transport.Acceptor;
57import com.sun.corba.se.pept.transport.Connection;
58import com.sun.corba.se.pept.transport.ConnectionCache;
59import com.sun.corba.se.pept.transport.ContactInfo;
60import com.sun.corba.se.pept.transport.EventHandler;
61import com.sun.corba.se.pept.transport.InboundConnectionCache;
62import com.sun.corba.se.pept.transport.OutboundConnectionCache;
63import com.sun.corba.se.pept.transport.ResponseWaitingRoom;
64import com.sun.corba.se.pept.transport.Selector;
65
66import com.sun.corba.se.spi.ior.IOR;
67import com.sun.corba.se.spi.ior.iiop.GIOPVersion;
68import com.sun.corba.se.spi.logging.CORBALogDomains;
69import com.sun.corba.se.spi.orb.ORB ;
70import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
71import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
72import com.sun.corba.se.spi.orbutil.threadpool.Work;
73import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
74import com.sun.corba.se.spi.transport.CorbaContactInfo;
75import com.sun.corba.se.spi.transport.CorbaConnection;
76import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;
77import com.sun.corba.se.spi.transport.ReadTimeouts;
78
79import com.sun.corba.se.impl.encoding.CachedCodeBase;
80import com.sun.corba.se.impl.encoding.CDRInputStream_1_0;
81import com.sun.corba.se.impl.encoding.CDROutputObject;
82import com.sun.corba.se.impl.encoding.CDROutputStream_1_0;
83import com.sun.corba.se.impl.encoding.CodeSetComponentInfo;
84import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry;
85import com.sun.corba.se.impl.logging.ORBUtilSystemException;
86import com.sun.corba.se.impl.orbutil.ORBConstants;
87import com.sun.corba.se.impl.orbutil.ORBUtility;
88import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
89import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase;
90import com.sun.corba.se.impl.transport.CorbaResponseWaitingRoomImpl;
91
92/**
93 * @author Harold Carr
94 */
95public class SocketOrChannelConnectionImpl
96    extends
97        EventHandlerBase
98    implements
99        CorbaConnection,
100        Work
101{
102    public static boolean dprintWriteLocks = false;
103
104    //
105    // New transport.
106    //
107
108    protected long enqueueTime;
109
110    protected SocketChannel socketChannel;
111    public SocketChannel getSocketChannel()
112    {
113        return socketChannel;
114    }
115
116    // REVISIT:
117    // protected for test: genericRPCMSGFramework.IIOPConnection constructor.
118    protected CorbaContactInfo contactInfo;
119    protected Acceptor acceptor;
120    protected ConnectionCache connectionCache;
121
122    //
123    // From iiop.Connection.java
124    //
125
126    protected Socket socket;    // The socket used for this connection.
127    protected long timeStamp = 0;
128    protected boolean isServer = false;
129
130    // Start at some value other than zero since this is a magic
131    // value in some protocols.
132    protected int requestId = 5;
133    protected CorbaResponseWaitingRoom responseWaitingRoom;
134    protected int state;
135    protected java.lang.Object stateEvent = new java.lang.Object();
136    protected java.lang.Object writeEvent = new java.lang.Object();
137    protected boolean writeLocked;
138    protected int serverRequestCount = 0;
139
140    // Server request map: used on the server side of Connection
141    // Maps request ID to IIOPInputStream.
142    Map serverRequestMap = null;
143
144    // This is a flag associated per connection telling us if the
145    // initial set of sending contexts were sent to the receiver
146    // already...
147    protected boolean postInitialContexts = false;
148
149    // Remote reference to CodeBase server (supplies
150    // FullValueDescription, among other things)
151    protected IOR codeBaseServerIOR;
152
153    // CodeBase cache for this connection.  This will cache remote operations,
154    // handle connecting, and ensure we don't do any remote operations until
155    // necessary.
156    protected CachedCodeBase cachedCodeBase = new CachedCodeBase(this);
157
158    protected ORBUtilSystemException wrapper ;
159
160    // transport read timeout values
161    protected ReadTimeouts readTimeouts;
162
163    protected boolean shouldReadGiopHeaderOnly;
164
165    // A message mediator used when shouldReadGiopHeaderOnly is
166    // true to maintain request message state across execution in a
167    // SelectorThread and WorkerThread.
168    protected CorbaMessageMediator partialMessageMediator = null;
169
170    // Used in genericRPCMSGFramework test.
171    protected SocketOrChannelConnectionImpl(ORB orb)
172    {
173        this.orb = orb;
174        wrapper = ORBUtilSystemException.get( orb,
175            CORBALogDomains.RPC_TRANSPORT ) ;
176
177        setWork(this);
178        responseWaitingRoom = new CorbaResponseWaitingRoomImpl(orb, this);
179        setReadTimeouts(orb.getORBData().getTransportTCPReadTimeouts());
180    }
181
182    // Both client and servers.
183    protected SocketOrChannelConnectionImpl(ORB orb,
184                                            boolean useSelectThreadToWait,
185                                            boolean useWorkerThread)
186    {
187        this(orb) ;
188        setUseSelectThreadToWait(useSelectThreadToWait);
189        setUseWorkerThreadForEvent(useWorkerThread);
190    }
191
192    // Client constructor.
193    public SocketOrChannelConnectionImpl(ORB orb,
194                                         CorbaContactInfo contactInfo,
195                                         boolean useSelectThreadToWait,
196                                         boolean useWorkerThread,
197                                         String socketType,
198                                         String hostname,
199                                         int port)
200    {
201        this(orb, useSelectThreadToWait, useWorkerThread);
202
203        this.contactInfo = contactInfo;
204
205        try {
206            socket = orb.getORBData().getSocketFactory()
207                .createSocket(socketType,
208                              new InetSocketAddress(hostname, port));
209            socketChannel = socket.getChannel();
210
211            if (socketChannel != null) {
212                boolean isBlocking = !useSelectThreadToWait;
213                socketChannel.configureBlocking(isBlocking);
214            } else {
215                // IMPORTANT: non-channel-backed sockets must use
216                // dedicated reader threads.
217                setUseSelectThreadToWait(false);
218            }
219            if (orb.transportDebugFlag) {
220                dprint(".initialize: connection created: " + socket);
221            }
222        } catch (Throwable t) {
223            throw wrapper.connectFailure(t, socketType, hostname,
224                                         Integer.toString(port));
225        }
226        state = OPENING;
227    }
228
229    // Client-side convenience.
230    public SocketOrChannelConnectionImpl(ORB orb,
231                                         CorbaContactInfo contactInfo,
232                                         String socketType,
233                                         String hostname,
234                                         int port)
235    {
236        this(orb, contactInfo,
237             orb.getORBData().connectionSocketUseSelectThreadToWait(),
238             orb.getORBData().connectionSocketUseWorkerThreadForEvent(),
239             socketType, hostname, port);
240    }
241
242    // Server-side constructor.
243    public SocketOrChannelConnectionImpl(ORB orb,
244                                         Acceptor acceptor,
245                                         Socket socket,
246                                         boolean useSelectThreadToWait,
247                                         boolean useWorkerThread)
248    {
249        this(orb, useSelectThreadToWait, useWorkerThread);
250
251        this.socket = socket;
252        socketChannel = socket.getChannel();
253        if (socketChannel != null) {
254            // REVISIT
255            try {
256                boolean isBlocking = !useSelectThreadToWait;
257                socketChannel.configureBlocking(isBlocking);
258            } catch (IOException e) {
259                RuntimeException rte = new RuntimeException();
260                rte.initCause(e);
261                throw rte;
262            }
263        }
264        this.acceptor = acceptor;
265
266        serverRequestMap = Collections.synchronizedMap(new HashMap());
267        isServer = true;
268
269        state = ESTABLISHED;
270    }
271
272    // Server-side convenience
273    public SocketOrChannelConnectionImpl(ORB orb,
274                                         Acceptor acceptor,
275                                         Socket socket)
276    {
277        this(orb, acceptor, socket,
278             (socket.getChannel() == null
279              ? false
280              : orb.getORBData().connectionSocketUseSelectThreadToWait()),
281             (socket.getChannel() == null
282              ? false
283              : orb.getORBData().connectionSocketUseWorkerThreadForEvent()));
284    }
285
286    ////////////////////////////////////////////////////
287    //
288    // framework.transport.Connection
289    //
290
291    public boolean shouldRegisterReadEvent()
292    {
293        return true;
294    }
295
296    public boolean shouldRegisterServerReadEvent()
297    {
298        return true;
299    }
300
301    public boolean read()
302    {
303        try {
304            if (orb.transportDebugFlag) {
305                dprint(".read->: " + this);
306            }
307            CorbaMessageMediator messageMediator = readBits();
308            if (messageMediator != null) {
309                // Null can happen when client closes stream
310                // causing purgecalls.
311                return dispatch(messageMediator);
312            }
313            return true;
314        } finally {
315            if (orb.transportDebugFlag) {
316                dprint(".read<-: " + this);
317            }
318        }
319    }
320
321    protected CorbaMessageMediator readBits()
322    {
323        try {
324
325            if (orb.transportDebugFlag) {
326                dprint(".readBits->: " + this);
327            }
328
329            MessageMediator messageMediator;
330            // REVISIT - use common factory base class.
331            if (contactInfo != null) {
332                messageMediator =
333                    contactInfo.createMessageMediator(orb, this);
334            } else if (acceptor != null) {
335                messageMediator = acceptor.createMessageMediator(orb, this);
336            } else {
337                throw
338                    new RuntimeException("SocketOrChannelConnectionImpl.readBits");
339            }
340            return (CorbaMessageMediator) messageMediator;
341
342        } catch (ThreadDeath td) {
343            if (orb.transportDebugFlag) {
344                dprint(".readBits: " + this + ": ThreadDeath: " + td, td);
345            }
346            try {
347                purgeCalls(wrapper.connectionAbort(td), false, false);
348            } catch (Throwable t) {
349                if (orb.transportDebugFlag) {
350                    dprint(".readBits: " + this + ": purgeCalls: Throwable: " + t, t);
351                }
352            }
353            throw td;
354        } catch (Throwable ex) {
355            if (orb.transportDebugFlag) {
356                dprint(".readBits: " + this + ": Throwable: " + ex, ex);
357            }
358
359            try {
360                if (ex instanceof INTERNAL) {
361                    sendMessageError(GIOPVersion.DEFAULT_VERSION);
362                }
363            } catch (IOException e) {
364                if (orb.transportDebugFlag) {
365                    dprint(".readBits: " + this +
366                           ": sendMessageError: IOException: " + e, e);
367                }
368            }
369            // REVISIT - make sure reader thread is killed.
370            Selector selector = orb.getTransportManager().getSelector(0);
371            if (selector != null) {
372                selector.unregisterForEvent(this);
373            }
374            // Notify anyone waiting.
375            purgeCalls(wrapper.connectionAbort(ex), true, false);
376            // REVISIT
377            //keepRunning = false;
378            // REVISIT - if this is called after purgeCalls then
379            // the state of the socket is ABORT so the writeLock
380            // in close throws an exception.  It is ignored but
381            // causes IBM (screen scraping) tests to fail.
382            //close();
383        } finally {
384            if (orb.transportDebugFlag) {
385                dprint(".readBits<-: " + this);
386            }
387        }
388        return null;
389    }
390
391    protected CorbaMessageMediator finishReadingBits(MessageMediator messageMediator)
392    {
393        try {
394
395            if (orb.transportDebugFlag) {
396                dprint(".finishReadingBits->: " + this);
397            }
398
399            // REVISIT - use common factory base class.
400            if (contactInfo != null) {
401                messageMediator =
402                    contactInfo.finishCreatingMessageMediator(orb, this, messageMediator);
403            } else if (acceptor != null) {
404                messageMediator =
405                    acceptor.finishCreatingMessageMediator(orb, this, messageMediator);
406            } else {
407                throw
408                    new RuntimeException("SocketOrChannelConnectionImpl.finishReadingBits");
409            }
410            return (CorbaMessageMediator) messageMediator;
411
412        } catch (ThreadDeath td) {
413            if (orb.transportDebugFlag) {
414                dprint(".finishReadingBits: " + this + ": ThreadDeath: " + td, td);
415            }
416            try {
417                purgeCalls(wrapper.connectionAbort(td), false, false);
418            } catch (Throwable t) {
419                if (orb.transportDebugFlag) {
420                    dprint(".finishReadingBits: " + this + ": purgeCalls: Throwable: " + t, t);
421                }
422            }
423            throw td;
424        } catch (Throwable ex) {
425            if (orb.transportDebugFlag) {
426                dprint(".finishReadingBits: " + this + ": Throwable: " + ex, ex);
427            }
428
429            try {
430                if (ex instanceof INTERNAL) {
431                    sendMessageError(GIOPVersion.DEFAULT_VERSION);
432                }
433            } catch (IOException e) {
434                if (orb.transportDebugFlag) {
435                    dprint(".finishReadingBits: " + this +
436                           ": sendMessageError: IOException: " + e, e);
437                }
438            }
439            // REVISIT - make sure reader thread is killed.
440            orb.getTransportManager().getSelector(0).unregisterForEvent(this);
441            // Notify anyone waiting.
442            purgeCalls(wrapper.connectionAbort(ex), true, false);
443            // REVISIT
444            //keepRunning = false;
445            // REVISIT - if this is called after purgeCalls then
446            // the state of the socket is ABORT so the writeLock
447            // in close throws an exception.  It is ignored but
448            // causes IBM (screen scraping) tests to fail.
449            //close();
450        } finally {
451            if (orb.transportDebugFlag) {
452                dprint(".finishReadingBits<-: " + this);
453            }
454        }
455        return null;
456    }
457
458    protected boolean dispatch(CorbaMessageMediator messageMediator)
459    {
460        try {
461            if (orb.transportDebugFlag) {
462                dprint(".dispatch->: " + this);
463            }
464
465            //
466            // NOTE:
467            //
468            // This call is the transition from the tranport block
469            // to the protocol block.
470            //
471
472            boolean result =
473                messageMediator.getProtocolHandler()
474                .handleRequest(messageMediator);
475
476            return result;
477
478        } catch (ThreadDeath td) {
479            if (orb.transportDebugFlag) {
480                dprint(".dispatch: ThreadDeath", td );
481            }
482            try {
483                purgeCalls(wrapper.connectionAbort(td), false, false);
484            } catch (Throwable t) {
485                if (orb.transportDebugFlag) {
486                    dprint(".dispatch: purgeCalls: Throwable", t);
487                }
488            }
489            throw td;
490        } catch (Throwable ex) {
491            if (orb.transportDebugFlag) {
492                dprint(".dispatch: Throwable", ex ) ;
493            }
494
495            try {
496                if (ex instanceof INTERNAL) {
497                    sendMessageError(GIOPVersion.DEFAULT_VERSION);
498                }
499            } catch (IOException e) {
500                if (orb.transportDebugFlag) {
501                    dprint(".dispatch: sendMessageError: IOException", e);
502                }
503            }
504            purgeCalls(wrapper.connectionAbort(ex), false, false);
505            // REVISIT
506            //keepRunning = false;
507        } finally {
508            if (orb.transportDebugFlag) {
509                dprint(".dispatch<-: " + this);
510            }
511        }
512
513        return true;
514    }
515
516    public boolean shouldUseDirectByteBuffers()
517    {
518        return getSocketChannel() != null;
519    }
520
521    public ByteBuffer read(int size, int offset, int length, long max_wait_time)
522        throws IOException
523    {
524        if (shouldUseDirectByteBuffers()) {
525
526            ByteBuffer byteBuffer =
527                orb.getByteBufferPool().getByteBuffer(size);
528
529            if (orb.transportDebugFlag) {
530                // print address of ByteBuffer gotten from pool
531                int bbAddress = System.identityHashCode(byteBuffer);
532                StringBuffer sb = new StringBuffer(80);
533                sb.append(".read: got ByteBuffer id (");
534                sb.append(bbAddress).append(") from ByteBufferPool.");
535                String msgStr = sb.toString();
536                dprint(msgStr);
537            }
538
539            byteBuffer.position(offset);
540            byteBuffer.limit(size);
541
542            readFully(byteBuffer, length, max_wait_time);
543
544            return byteBuffer;
545        }
546
547        byte[] buf = new byte[size];
548        readFully(getSocket().getInputStream(), buf,
549                  offset, length, max_wait_time);
550        ByteBuffer byteBuffer = ByteBuffer.wrap(buf);
551        byteBuffer.limit(size);
552        return byteBuffer;
553    }
554
555    public ByteBuffer read(ByteBuffer byteBuffer, int offset,
556                           int length, long max_wait_time)
557        throws IOException
558    {
559        int size = offset + length;
560        if (shouldUseDirectByteBuffers()) {
561
562            if (! byteBuffer.isDirect()) {
563                throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
564            }
565            if (size > byteBuffer.capacity()) {
566                if (orb.transportDebugFlag) {
567                    // print address of ByteBuffer being released
568                    int bbAddress = System.identityHashCode(byteBuffer);
569                    StringBuffer bbsb = new StringBuffer(80);
570                    bbsb.append(".read: releasing ByteBuffer id (")
571                        .append(bbAddress).append(") to ByteBufferPool.");
572                    String bbmsg = bbsb.toString();
573                    dprint(bbmsg);
574                }
575                orb.getByteBufferPool().releaseByteBuffer(byteBuffer);
576                byteBuffer = orb.getByteBufferPool().getByteBuffer(size);
577            }
578            byteBuffer.position(offset);
579            byteBuffer.limit(size);
580            readFully(byteBuffer, length, max_wait_time);
581            byteBuffer.position(0);
582            byteBuffer.limit(size);
583            return byteBuffer;
584        }
585        if (byteBuffer.isDirect()) {
586            throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
587        }
588        byte[] buf = new byte[size];
589        readFully(getSocket().getInputStream(), buf,
590                  offset, length, max_wait_time);
591        return ByteBuffer.wrap(buf);
592    }
593
594    public void readFully(ByteBuffer byteBuffer, int size, long max_wait_time)
595        throws IOException
596    {
597        int n = 0;
598        int bytecount = 0;
599        long time_to_wait = readTimeouts.get_initial_time_to_wait();
600        long total_time_in_wait = 0;
601
602        // The reading of data incorporates a strategy to detect a
603        // rogue client. The strategy is implemented as follows. As
604        // long as data is being read, at least 1 byte or more, we
605        // assume we have a well behaved client. If no data is read,
606        // then we sleep for a time to wait, re-calculate a new time to
607        // wait which is lengthier than the previous time spent waiting.
608        // Then, if the total time spent waiting does not exceed a
609        // maximum time we are willing to wait, we attempt another
610        // read. If the maximum amount of time we are willing to
611        // spend waiting for more data is exceeded, we throw an
612        // IOException.
613
614        // NOTE: Reading of GIOP headers are treated with a smaller
615        //       maximum time to wait threshold. Based on extensive
616        //       performance testing, all GIOP headers are being
617        //       read in 1 read access.
618
619        do {
620            bytecount = getSocketChannel().read(byteBuffer);
621
622            if (bytecount < 0) {
623                throw new IOException("End-of-stream");
624            }
625            else if (bytecount == 0) {
626                try {
627                    Thread.sleep(time_to_wait);
628                    total_time_in_wait += time_to_wait;
629                    time_to_wait =
630                        (long)(time_to_wait*readTimeouts.get_backoff_factor());
631                }
632                catch (InterruptedException ie) {
633                    // ignore exception
634                    if (orb.transportDebugFlag) {
635                        dprint("readFully(): unexpected exception "
636                                + ie.toString());
637                    }
638                }
639            }
640            else {
641                n += bytecount;
642            }
643        }
644        while (n < size && total_time_in_wait < max_wait_time);
645
646        if (n < size && total_time_in_wait >= max_wait_time)
647        {
648            // failed to read entire message
649            throw wrapper.transportReadTimeoutExceeded(new Integer(size),
650                                      new Integer(n), new Long(max_wait_time),
651                                      new Long(total_time_in_wait));
652        }
653
654        getConnectionCache().stampTime(this);
655    }
656
657    // To support non-channel connections.
658    public void readFully(java.io.InputStream is, byte[] buf,
659                          int offset, int size, long max_wait_time)
660        throws IOException
661    {
662        int n = 0;
663        int bytecount = 0;
664        long time_to_wait = readTimeouts.get_initial_time_to_wait();
665        long total_time_in_wait = 0;
666
667        // The reading of data incorporates a strategy to detect a
668        // rogue client. The strategy is implemented as follows. As
669        // long as data is being read, at least 1 byte or more, we
670        // assume we have a well behaved client. If no data is read,
671        // then we sleep for a time to wait, re-calculate a new time to
672        // wait which is lengthier than the previous time spent waiting.
673        // Then, if the total time spent waiting does not exceed a
674        // maximum time we are willing to wait, we attempt another
675        // read. If the maximum amount of time we are willing to
676        // spend waiting for more data is exceeded, we throw an
677        // IOException.
678
679        // NOTE: Reading of GIOP headers are treated with a smaller
680        //       maximum time to wait threshold. Based on extensive
681        //       performance testing, all GIOP headers are being
682        //       read in 1 read access.
683
684        do {
685            bytecount = is.read(buf, offset + n, size - n);
686            if (bytecount < 0) {
687                throw new IOException("End-of-stream");
688            }
689            else if (bytecount == 0) {
690                try {
691                    Thread.sleep(time_to_wait);
692                    total_time_in_wait += time_to_wait;
693                    time_to_wait =
694                        (long)(time_to_wait*readTimeouts.get_backoff_factor());
695                }
696                catch (InterruptedException ie) {
697                    // ignore exception
698                    if (orb.transportDebugFlag) {
699                        dprint("readFully(): unexpected exception "
700                                + ie.toString());
701                    }
702                }
703            }
704            else {
705                n += bytecount;
706            }
707        }
708        while (n < size && total_time_in_wait < max_wait_time);
709
710        if (n < size && total_time_in_wait >= max_wait_time)
711        {
712            // failed to read entire message
713            throw wrapper.transportReadTimeoutExceeded(new Integer(size),
714                                      new Integer(n), new Long(max_wait_time),
715                                      new Long(total_time_in_wait));
716        }
717
718        getConnectionCache().stampTime(this);
719    }
720
721    public void write(ByteBuffer byteBuffer)
722        throws IOException
723    {
724        if (shouldUseDirectByteBuffers()) {
725            /* NOTE: cannot perform this test.  If one ask for a
726               ByteBuffer from the pool which is bigger than the size
727               of ByteBuffers managed by the pool, then the pool will
728               return a HeapByteBuffer.
729            if (byteBuffer.hasArray()) {
730                throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
731            }
732            */
733            // IMPORTANT: For non-blocking SocketChannels, there's no guarantee
734            //            all bytes are written on first write attempt.
735            do {
736                getSocketChannel().write(byteBuffer);
737            }
738            while (byteBuffer.hasRemaining());
739
740        } else {
741            if (! byteBuffer.hasArray()) {
742                throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
743            }
744            byte[] tmpBuf = byteBuffer.array();
745            getSocket().getOutputStream().write(tmpBuf, 0, byteBuffer.limit());
746            getSocket().getOutputStream().flush();
747        }
748
749        // TimeStamp connection to indicate it has been used
750        // Note granularity of connection usage is assumed for
751        // now to be that of a IIOP packet.
752        getConnectionCache().stampTime(this);
753    }
754
755    /**
756     * Note:it is possible for this to be called more than once
757     */
758    public synchronized void close()
759    {
760        try {
761            if (orb.transportDebugFlag) {
762                dprint(".close->: " + this);
763            }
764            writeLock();
765
766            // REVISIT It will be good to have a read lock on the reader thread
767            // before we proceed further, to avoid the reader thread (server side)
768            // from processing requests. This avoids the risk that a new request
769            // will be accepted by ReaderThread while the ListenerThread is
770            // attempting to close this connection.
771
772            if (isBusy()) { // we are busy!
773                writeUnlock();
774                if (orb.transportDebugFlag) {
775                    dprint(".close: isBusy so no close: " + this);
776                }
777                return;
778            }
779
780            try {
781                try {
782                    sendCloseConnection(GIOPVersion.V1_0);
783                } catch (Throwable t) {
784                    wrapper.exceptionWhenSendingCloseConnection(t);
785                }
786
787                synchronized ( stateEvent ){
788                    state = CLOSE_SENT;
789                    stateEvent.notifyAll();
790                }
791
792                // stop the reader without causing it to do purgeCalls
793                //Exception ex = new Exception();
794                //reader.stop(ex); // REVISIT
795
796                // NOTE: !!!!!!
797                // This does writeUnlock().
798                purgeCalls(wrapper.connectionRebind(), false, true);
799
800            } catch (Exception ex) {
801                if (orb.transportDebugFlag) {
802                    dprint(".close: exception: " + this, ex);
803                }
804            }
805            try {
806                Selector selector = orb.getTransportManager().getSelector(0);
807                if (selector != null) {
808                    selector.unregisterForEvent(this);
809                }
810                if (socketChannel != null) {
811                    socketChannel.close();
812                }
813                socket.close();
814            } catch (IOException e) {
815                if (orb.transportDebugFlag) {
816                    dprint(".close: " + this, e);
817                }
818            }
819            closeConnectionResources();
820        } finally {
821            if (orb.transportDebugFlag) {
822                dprint(".close<-: " + this);
823            }
824        }
825    }
826
827    public void closeConnectionResources() {
828           if (orb.transportDebugFlag) {
829               dprint(".closeConnectionResources->: " + this);
830           }
831           Selector selector = orb.getTransportManager().getSelector(0);
832           if (selector != null) {
833               selector.unregisterForEvent(this);
834           }
835           try {
836             if (socketChannel != null)
837              socketChannel.close() ;
838                if (socket != null && !socket.isClosed())
839                socket.close() ;
840           } catch (IOException e) {
841             if (orb.transportDebugFlag) {
842                 dprint( ".closeConnectionResources: " + this, e ) ;
843             }
844           }
845           if (orb.transportDebugFlag) {
846               dprint(".closeConnectionResources<-: " + this);
847           }
848     }
849
850
851    public Acceptor getAcceptor()
852    {
853        return acceptor;
854    }
855
856    public ContactInfo getContactInfo()
857    {
858        return contactInfo;
859    }
860
861    public EventHandler getEventHandler()
862    {
863        return this;
864    }
865
866    public OutputObject createOutputObject(MessageMediator messageMediator)
867    {
868        // REVISIT - remove this method from Connection and all it subclasses.
869        throw new RuntimeException("*****SocketOrChannelConnectionImpl.createOutputObject - should not be called.");
870    }
871
872    // This is used by the GIOPOutputObject in order to
873    // throw the correct error when handling code sets.
874    // Can we determine if we are on the server side by
875    // other means?  XREVISIT
876    public boolean isServer()
877    {
878        return isServer;
879    }
880
881    public boolean isBusy()
882    {
883        if (serverRequestCount > 0 ||
884            getResponseWaitingRoom().numberRegistered() > 0)
885        {
886            return true;
887        } else {
888            return false;
889        }
890    }
891
892    public long getTimeStamp()
893    {
894        return timeStamp;
895    }
896
897    public void setTimeStamp(long time)
898    {
899        timeStamp = time;
900    }
901
902    public void setState(String stateString)
903    {
904        synchronized (stateEvent) {
905            if (stateString.equals("ESTABLISHED")) {
906                state =  ESTABLISHED;
907                stateEvent.notifyAll();
908            } else {
909                // REVISIT: ASSERT
910            }
911        }
912    }
913
914    /**
915     * Sets the writeLock for this connection.
916     * If the writeLock is already set by someone else, block till the
917     * writeLock is released and can set by us.
918     * IMPORTANT: this connection's lock must be acquired before
919     * setting the writeLock and must be unlocked after setting the writeLock.
920     */
921    public void writeLock()
922    {
923      try {
924        if (dprintWriteLocks && orb.transportDebugFlag) {
925            dprint(".writeLock->: " + this);
926        }
927        // Keep looping till we can set the writeLock.
928        while ( true ) {
929            int localState = state;
930            switch ( localState ) {
931
932            case OPENING:
933                synchronized (stateEvent) {
934                    if (state != OPENING) {
935                        // somebody has changed 'state' so be careful
936                        break;
937                    }
938                    try {
939                        stateEvent.wait();
940                    } catch (InterruptedException ie) {
941                        if (orb.transportDebugFlag) {
942                            dprint(".writeLock: OPENING InterruptedException: " + this);
943                        }
944                    }
945                }
946                // Loop back
947                break;
948
949            case ESTABLISHED:
950                synchronized (writeEvent) {
951                    if (!writeLocked) {
952                        writeLocked = true;
953                        return;
954                    }
955
956                    try {
957                        // do not stay here too long if state != ESTABLISHED
958                        // Bug 4752117
959                        while (state == ESTABLISHED && writeLocked) {
960                            writeEvent.wait(100);
961                        }
962                    } catch (InterruptedException ie) {
963                        if (orb.transportDebugFlag) {
964                            dprint(".writeLock: ESTABLISHED InterruptedException: " + this);
965                        }
966                    }
967                }
968                // Loop back
969                break;
970
971                //
972                // XXX
973                // Need to distinguish between client and server roles
974                // here probably.
975                //
976            case ABORT:
977                synchronized ( stateEvent ){
978                    if (state != ABORT) {
979                        break;
980                    }
981                    throw wrapper.writeErrorSend() ;
982                }
983
984            case CLOSE_RECVD:
985                // the connection has been closed or closing
986                // ==> throw rebind exception
987                synchronized ( stateEvent ){
988                    if (state != CLOSE_RECVD) {
989                        break;
990                    }
991                    throw wrapper.connectionCloseRebind() ;
992                }
993
994            default:
995                if (orb.transportDebugFlag) {
996                    dprint(".writeLock: default: " + this);
997                }
998                // REVISIT
999                throw new RuntimeException(".writeLock: bad state");
1000            }
1001        }
1002      } finally {
1003        if (dprintWriteLocks && orb.transportDebugFlag) {
1004            dprint(".writeLock<-: " + this);
1005        }
1006      }
1007    }
1008
1009    public void writeUnlock()
1010    {
1011        try {
1012            if (dprintWriteLocks && orb.transportDebugFlag) {
1013                dprint(".writeUnlock->: " + this);
1014            }
1015            synchronized (writeEvent) {
1016                writeLocked = false;
1017                writeEvent.notify(); // wake up one guy waiting to write
1018            }
1019        } finally {
1020            if (dprintWriteLocks && orb.transportDebugFlag) {
1021                dprint(".writeUnlock<-: " + this);
1022            }
1023        }
1024    }
1025
1026    // Assumes the caller handles writeLock and writeUnlock
1027    public void sendWithoutLock(OutputObject outputObject)
1028    {
1029        // Don't we need to check for CloseConnection
1030        // here?  REVISIT
1031
1032        // XREVISIT - Shouldn't the MessageMediator
1033        // be the one to handle writing the data here?
1034
1035        try {
1036
1037            // Write the fragment/message
1038
1039            CDROutputObject cdrOutputObject = (CDROutputObject) outputObject;
1040            cdrOutputObject.writeTo(this);
1041            // REVISIT - no flush?
1042            //socket.getOutputStream().flush();
1043
1044        } catch (IOException e1) {
1045
1046            /*
1047             * ADDED(Ram J) 10/13/2000 In the event of an IOException, try
1048             * sending a CancelRequest for regular requests / locate requests
1049             */
1050
1051            // Since IIOPOutputStream's msgheader is set only once, and not
1052            // altered during sending multiple fragments, the original
1053            // msgheader will always have the requestId.
1054            // REVISIT This could be optimized to send a CancelRequest only
1055            // if any fragments had been sent already.
1056
1057            /* REVISIT: MOVE TO SUBCONTRACT
1058            Message msg = os.getMessage();
1059            if (msg.getType() == Message.GIOPRequest ||
1060                    msg.getType() == Message.GIOPLocateRequest) {
1061                GIOPVersion requestVersion = msg.getGIOPVersion();
1062                int requestId = MessageBase.getRequestId(msg);
1063                try {
1064                    sendCancelRequest(requestVersion, requestId);
1065                } catch (IOException e2) {
1066                    // most likely an abortive connection closure.
1067                    // ignore, since nothing more can be done.
1068                    if (orb.transportDebugFlag) {
1069
1070                }
1071            }
1072            */
1073
1074            // REVISIT When a send failure happens, purgeCalls() need to be
1075            // called to ensure that the connection is properly removed from
1076            // further usage (ie., cancelling pending requests with COMM_FAILURE
1077            // with an appropriate minor_code CompletionStatus.MAY_BE).
1078
1079            // Relying on the IIOPOutputStream (as noted below) is not
1080            // sufficient as it handles COMM_FAILURE only for the final
1081            // fragment (during invoke processing). Note that COMM_FAILURE could
1082            // happen while sending the initial fragments.
1083            // Also the IIOPOutputStream does not properly close the connection.
1084            // It simply removes the connection from the table. An orderly
1085            // closure is needed (ie., cancel pending requests on the connection
1086            // COMM_FAILURE as well.
1087
1088            // IIOPOutputStream will cleanup the connection info when it
1089            // sees this exception.
1090            SystemException exc = wrapper.writeErrorSend(e1);
1091            purgeCalls(exc, false, true);
1092            throw exc;
1093        }
1094    }
1095
1096    public void registerWaiter(MessageMediator messageMediator)
1097    {
1098        responseWaitingRoom.registerWaiter(messageMediator);
1099    }
1100
1101    public void unregisterWaiter(MessageMediator messageMediator)
1102    {
1103        responseWaitingRoom.unregisterWaiter(messageMediator);
1104    }
1105
1106    public InputObject waitForResponse(MessageMediator messageMediator)
1107    {
1108        return responseWaitingRoom.waitForResponse(messageMediator);
1109    }
1110
1111    public void setConnectionCache(ConnectionCache connectionCache)
1112    {
1113        this.connectionCache = connectionCache;
1114    }
1115
1116    public ConnectionCache getConnectionCache()
1117    {
1118        return connectionCache;
1119    }
1120
1121    ////////////////////////////////////////////////////
1122    //
1123    // EventHandler methods
1124    //
1125
1126    public void setUseSelectThreadToWait(boolean x)
1127    {
1128        useSelectThreadToWait = x;
1129        // REVISIT - Reading of a GIOP header only is information
1130        //           that should be passed into the constructor
1131        //           from the SocketOrChannelConnection factory.
1132        setReadGiopHeaderOnly(shouldUseSelectThreadToWait());
1133    }
1134
1135    public void handleEvent()
1136    {
1137        if (orb.transportDebugFlag) {
1138            dprint(".handleEvent->: " + this);
1139        }
1140        getSelectionKey().interestOps(getSelectionKey().interestOps() &
1141                                      (~ getInterestOps()));
1142
1143        if (shouldUseWorkerThreadForEvent()) {
1144            Throwable throwable = null;
1145            try {
1146                int poolToUse = 0;
1147                if (shouldReadGiopHeaderOnly()) {
1148                    partialMessageMediator = readBits();
1149                    poolToUse =
1150                        partialMessageMediator.getThreadPoolToUse();
1151                }
1152
1153                if (orb.transportDebugFlag) {
1154                    dprint(".handleEvent: addWork to pool: " + poolToUse);
1155                }
1156                orb.getThreadPoolManager().getThreadPool(poolToUse)
1157                    .getWorkQueue(0).addWork(getWork());
1158            } catch (NoSuchThreadPoolException e) {
1159                throwable = e;
1160            } catch (NoSuchWorkQueueException e) {
1161                throwable = e;
1162            }
1163            // REVISIT: need to close connection.
1164            if (throwable != null) {
1165                if (orb.transportDebugFlag) {
1166                    dprint(".handleEvent: " + throwable);
1167                }
1168                INTERNAL i = new INTERNAL("NoSuchThreadPoolException");
1169                i.initCause(throwable);
1170                throw i;
1171            }
1172        } else {
1173            if (orb.transportDebugFlag) {
1174                dprint(".handleEvent: doWork");
1175            }
1176            getWork().doWork();
1177        }
1178        if (orb.transportDebugFlag) {
1179            dprint(".handleEvent<-: " + this);
1180        }
1181    }
1182
1183    public SelectableChannel getChannel()
1184    {
1185        return socketChannel;
1186    }
1187
1188    public int getInterestOps()
1189    {
1190        return SelectionKey.OP_READ;
1191    }
1192
1193    //    public Acceptor getAcceptor() - already defined above.
1194
1195    public Connection getConnection()
1196    {
1197        return this;
1198    }
1199
1200    ////////////////////////////////////////////////////
1201    //
1202    // Work methods.
1203    //
1204
1205    public String getName()
1206    {
1207        return this.toString();
1208    }
1209
1210    public void doWork()
1211    {
1212        try {
1213            if (orb.transportDebugFlag) {
1214                dprint(".doWork->: " + this);
1215            }
1216
1217            // IMPORTANT: Sanity checks on SelectionKeys such as
1218            //            SelectorKey.isValid() should not be done
1219            //            here.
1220            //
1221
1222            if (!shouldReadGiopHeaderOnly()) {
1223                read();
1224            }
1225            else {
1226                // get the partialMessageMediator
1227                // created by SelectorThread
1228                CorbaMessageMediator messageMediator =
1229                                         this.getPartialMessageMediator();
1230
1231                // read remaining info needed in a MessageMediator
1232                messageMediator = finishReadingBits(messageMediator);
1233
1234                if (messageMediator != null) {
1235                    // Null can happen when client closes stream
1236                    // causing purgecalls.
1237                    dispatch(messageMediator);
1238                }
1239            }
1240        } catch (Throwable t) {
1241            if (orb.transportDebugFlag) {
1242                dprint(".doWork: ignoring Throwable: "
1243                       + t
1244                       + " " + this);
1245            }
1246        } finally {
1247            if (orb.transportDebugFlag) {
1248                dprint(".doWork<-: " + this);
1249            }
1250        }
1251    }
1252
1253    public void setEnqueueTime(long timeInMillis)
1254    {
1255        enqueueTime = timeInMillis;
1256    }
1257
1258    public long getEnqueueTime()
1259    {
1260        return enqueueTime;
1261    }
1262
1263    ////////////////////////////////////////////////////
1264    //
1265    // spi.transport.CorbaConnection.
1266    //
1267
1268    // IMPORTANT: Reader Threads must NOT read Giop header only.
1269    public boolean shouldReadGiopHeaderOnly() {
1270        return shouldReadGiopHeaderOnly;
1271    }
1272
1273    protected void setReadGiopHeaderOnly(boolean shouldReadHeaderOnly) {
1274        shouldReadGiopHeaderOnly = shouldReadHeaderOnly;
1275    }
1276
1277    public ResponseWaitingRoom getResponseWaitingRoom()
1278    {
1279        return responseWaitingRoom;
1280    }
1281
1282    // REVISIT - inteface defines isServer but already defined in
1283    // higher interface.
1284
1285    public void serverRequestMapPut(int requestId,
1286                                    CorbaMessageMediator messageMediator)
1287    {
1288        serverRequestMap.put(new Integer(requestId), messageMediator);
1289    }
1290
1291    public CorbaMessageMediator serverRequestMapGet(int requestId)
1292    {
1293        return (CorbaMessageMediator)
1294            serverRequestMap.get(new Integer(requestId));
1295    }
1296
1297    public void serverRequestMapRemove(int requestId)
1298    {
1299        serverRequestMap.remove(new Integer(requestId));
1300    }
1301
1302
1303    // REVISIT: this is also defined in:
1304    // com.sun.corba.se.spi.legacy.connection.Connection
1305    public java.net.Socket getSocket()
1306    {
1307        return socket;
1308    }
1309
1310    /** It is possible for a Close Connection to have been
1311     ** sent here, but we will not check for this. A "lazy"
1312     ** Exception will be thrown in the Worker thread after the
1313     ** incoming request has been processed even though the connection
1314     ** is closed before the request is processed. This is o.k because
1315     ** it is a boundary condition. To prevent it we would have to add
1316     ** more locks which would reduce performance in the normal case.
1317     **/
1318    public synchronized void serverRequestProcessingBegins()
1319    {
1320        serverRequestCount++;
1321    }
1322
1323    public synchronized void serverRequestProcessingEnds()
1324    {
1325        serverRequestCount--;
1326    }
1327
1328    //
1329    //
1330    //
1331
1332    public synchronized int getNextRequestId()
1333    {
1334        return requestId++;
1335    }
1336
1337    // Negotiated code sets for char and wchar data
1338    protected CodeSetComponentInfo.CodeSetContext codeSetContext = null;
1339
1340    public ORB getBroker()
1341    {
1342        return orb;
1343    }
1344
1345    public CodeSetComponentInfo.CodeSetContext getCodeSetContext() {
1346        // Needs to be synchronized for the following case when the client
1347        // doesn't send the code set context twice, and we have two threads
1348        // in ServerRequestDispatcher processCodeSetContext.
1349        //
1350        // Thread A checks to see if there is a context, there is none, so
1351        //     it calls setCodeSetContext, getting the synch lock.
1352        // Thread B checks to see if there is a context.  If we didn't synch,
1353        //     it might decide to outlaw wchar/wstring.
1354        if (codeSetContext == null) {
1355            synchronized(this) {
1356                return codeSetContext;
1357            }
1358        }
1359
1360        return codeSetContext;
1361    }
1362
1363    public synchronized void setCodeSetContext(CodeSetComponentInfo.CodeSetContext csc) {
1364        // Double check whether or not we need to do this
1365        if (codeSetContext == null) {
1366
1367            if (OSFCodeSetRegistry.lookupEntry(csc.getCharCodeSet()) == null ||
1368                OSFCodeSetRegistry.lookupEntry(csc.getWCharCodeSet()) == null) {
1369                // If the client says it's negotiated a code set that
1370                // isn't a fallback and we never said we support, then
1371                // it has a bug.
1372                throw wrapper.badCodesetsFromClient() ;
1373            }
1374
1375            codeSetContext = csc;
1376        }
1377    }
1378
1379    //
1380    // from iiop.IIOPConnection.java
1381    //
1382
1383    // Map request ID to an InputObject.
1384    // This is so the client thread can start unmarshaling
1385    // the reply and remove it from the out_calls map while the
1386    // ReaderThread can still obtain the input stream to give
1387    // new fragments.  Only the ReaderThread touches the clientReplyMap,
1388    // so it doesn't incur synchronization overhead.
1389
1390    public MessageMediator clientRequestMapGet(int requestId)
1391    {
1392        return responseWaitingRoom.getMessageMediator(requestId);
1393    }
1394
1395    protected MessageMediator clientReply_1_1;
1396
1397    public void clientReply_1_1_Put(MessageMediator x)
1398    {
1399        clientReply_1_1 = x;
1400    }
1401
1402    public MessageMediator clientReply_1_1_Get()
1403    {
1404        return  clientReply_1_1;
1405    }
1406
1407    public void clientReply_1_1_Remove()
1408    {
1409        clientReply_1_1 = null;
1410    }
1411
1412    protected MessageMediator serverRequest_1_1;
1413
1414    public void serverRequest_1_1_Put(MessageMediator x)
1415    {
1416        serverRequest_1_1 = x;
1417    }
1418
1419    public MessageMediator serverRequest_1_1_Get()
1420    {
1421        return  serverRequest_1_1;
1422    }
1423
1424    public void serverRequest_1_1_Remove()
1425    {
1426        serverRequest_1_1 = null;
1427    }
1428
1429    protected String getStateString( int state )
1430    {
1431        synchronized ( stateEvent ){
1432            switch (state) {
1433            case OPENING : return "OPENING" ;
1434            case ESTABLISHED : return "ESTABLISHED" ;
1435            case CLOSE_SENT : return "CLOSE_SENT" ;
1436            case CLOSE_RECVD : return "CLOSE_RECVD" ;
1437            case ABORT : return "ABORT" ;
1438            default : return "???" ;
1439            }
1440        }
1441    }
1442
1443    public synchronized boolean isPostInitialContexts() {
1444        return postInitialContexts;
1445    }
1446
1447    // Can never be unset...
1448    public synchronized void setPostInitialContexts(){
1449        postInitialContexts = true;
1450    }
1451
1452    /**
1453     * Wake up the outstanding requests on the connection, and hand them
1454     * COMM_FAILURE exception with a given minor code.
1455     *
1456     * Also, delete connection from connection table and
1457     * stop the reader thread.
1458
1459     * Note that this should only ever be called by the Reader thread for
1460     * this connection.
1461     *
1462     * @param minor_code The minor code for the COMM_FAILURE major code.
1463     * @param die Kill the reader thread (this thread) before exiting.
1464     */
1465    public void purgeCalls(SystemException systemException,
1466                           boolean die, boolean lockHeld)
1467    {
1468        int minor_code = systemException.minor;
1469
1470        try{
1471            if (orb.transportDebugFlag) {
1472                dprint(".purgeCalls->: "
1473                       + minor_code + "/" + die + "/" + lockHeld
1474                       + " " + this);
1475            }
1476
1477            // If this invocation is a result of ThreadDeath caused
1478            // by a previous execution of this routine, just exit.
1479
1480            synchronized ( stateEvent ){
1481                if ((state == ABORT) || (state == CLOSE_RECVD)) {
1482                    if (orb.transportDebugFlag) {
1483                        dprint(".purgeCalls: exiting since state is: "
1484                               + getStateString(state)
1485                               + " " + this);
1486                    }
1487                    return;
1488                }
1489            }
1490
1491            // Grab the writeLock (freeze the calls)
1492            try {
1493                if (!lockHeld) {
1494                    writeLock();
1495                }
1496            } catch (SystemException ex) {
1497                if (orb.transportDebugFlag)
1498                    dprint(".purgeCalls: SystemException" + ex
1499                           + "; continuing " + this);
1500            }
1501
1502            // Mark the state of the connection
1503            // and determine the request status
1504            org.omg.CORBA.CompletionStatus completion_status;
1505            synchronized ( stateEvent ){
1506                if (minor_code == ORBUtilSystemException.CONNECTION_REBIND) {
1507                    state = CLOSE_RECVD;
1508                    systemException.completed = CompletionStatus.COMPLETED_NO;
1509                } else {
1510                    state = ABORT;
1511                    systemException.completed = CompletionStatus.COMPLETED_MAYBE;
1512                }
1513                stateEvent.notifyAll();
1514            }
1515
1516            try {
1517                socket.getInputStream().close();
1518                socket.getOutputStream().close();
1519                socket.close();
1520            } catch (Exception ex) {
1521                if (orb.transportDebugFlag) {
1522                    dprint(".purgeCalls: Exception closing socket: " + ex
1523                           + " " + this);
1524                }
1525            }
1526
1527            // Signal all threads with outstanding requests on this
1528            // connection and give them the SystemException;
1529
1530            responseWaitingRoom.signalExceptionToAllWaiters(systemException);
1531        } finally {
1532            if (contactInfo != null) {
1533                ((OutboundConnectionCache)getConnectionCache()).remove(contactInfo);
1534            } else if (acceptor != null) {
1535                ((InboundConnectionCache)getConnectionCache()).remove(this);
1536            }
1537
1538            //
1539            // REVISIT: Stop the reader thread
1540            //
1541
1542            // Signal all the waiters of the writeLock.
1543            // There are 4 types of writeLock waiters:
1544            // 1. Send waiters:
1545            // 2. SendReply waiters:
1546            // 3. cleanUp waiters:
1547            // 4. purge_call waiters:
1548            //
1549
1550            writeUnlock();
1551
1552            if (orb.transportDebugFlag) {
1553                dprint(".purgeCalls<-: "
1554                       + minor_code + "/" + die + "/" + lockHeld
1555                       + " " + this);
1556            }
1557        }
1558    }
1559
1560    /*************************************************************************
1561    * The following methods are for dealing with Connection cleaning for
1562    * better scalability of servers in high network load conditions.
1563    **************************************************************************/
1564
1565    public void sendCloseConnection(GIOPVersion giopVersion)
1566        throws IOException
1567    {
1568        Message msg = MessageBase.createCloseConnection(giopVersion);
1569        sendHelper(giopVersion, msg);
1570    }
1571
1572    public void sendMessageError(GIOPVersion giopVersion)
1573        throws IOException
1574    {
1575        Message msg = MessageBase.createMessageError(giopVersion);
1576        sendHelper(giopVersion, msg);
1577    }
1578
1579    /**
1580     * Send a CancelRequest message. This does not lock the connection, so the
1581     * caller needs to ensure this method is called appropriately.
1582     * @exception IOException - could be due to abortive connection closure.
1583     */
1584    public void sendCancelRequest(GIOPVersion giopVersion, int requestId)
1585        throws IOException
1586    {
1587
1588        Message msg = MessageBase.createCancelRequest(giopVersion, requestId);
1589        sendHelper(giopVersion, msg);
1590    }
1591
1592    protected void sendHelper(GIOPVersion giopVersion, Message msg)
1593        throws IOException
1594    {
1595        // REVISIT: See comments in CDROutputObject constructor.
1596        CDROutputObject outputObject =
1597            sun.corba.OutputStreamFactory.newCDROutputObject((ORB)orb, null, giopVersion,
1598                                this, msg, ORBConstants.STREAM_FORMAT_VERSION_1);
1599        msg.write(outputObject);
1600
1601        outputObject.writeTo(this);
1602    }
1603
1604    public void sendCancelRequestWithLock(GIOPVersion giopVersion,
1605                                          int requestId)
1606        throws IOException
1607    {
1608        writeLock();
1609        try {
1610            sendCancelRequest(giopVersion, requestId);
1611        } finally {
1612            writeUnlock();
1613        }
1614    }
1615
1616    // Begin Code Base methods ---------------------------------------
1617    //
1618    // Set this connection's code base IOR.  The IOR comes from the
1619    // SendingContext.  This is an optional service context, but all
1620    // JavaSoft ORBs send it.
1621    //
1622    // The set and get methods don't need to be synchronized since the
1623    // first possible get would occur during reading a valuetype, and
1624    // that would be after the set.
1625
1626    // Sets this connection's code base IOR.  This is done after
1627    // getting the IOR out of the SendingContext service context.
1628    // Our ORBs always send this, but it's optional in CORBA.
1629
1630    public final void setCodeBaseIOR(IOR ior) {
1631        codeBaseServerIOR = ior;
1632    }
1633
1634    public final IOR getCodeBaseIOR() {
1635        return codeBaseServerIOR;
1636    }
1637
1638    // Get a CodeBase stub to use in unmarshaling.  The CachedCodeBase
1639    // won't connect to the remote codebase unless it's necessary.
1640    public final CodeBase getCodeBase() {
1641        return cachedCodeBase;
1642    }
1643
1644    // End Code Base methods -----------------------------------------
1645
1646    // set transport read thresholds
1647    protected void setReadTimeouts(ReadTimeouts readTimeouts) {
1648        this.readTimeouts = readTimeouts;
1649    }
1650
1651    protected void setPartialMessageMediator(CorbaMessageMediator messageMediator) {
1652        partialMessageMediator = messageMediator;
1653    }
1654
1655    protected CorbaMessageMediator getPartialMessageMediator() {
1656        return partialMessageMediator;
1657    }
1658
1659    public String toString()
1660    {
1661        synchronized ( stateEvent ){
1662            return
1663                "SocketOrChannelConnectionImpl[" + " "
1664                + (socketChannel == null ?
1665                   socket.toString() : socketChannel.toString()) + " "
1666                + getStateString( state ) + " "
1667                + shouldUseSelectThreadToWait() + " "
1668                + shouldUseWorkerThreadForEvent() + " "
1669                + shouldReadGiopHeaderOnly()
1670                + "]" ;
1671        }
1672    }
1673
1674    // Must be public - used in encoding.
1675    public void dprint(String msg)
1676    {
1677        ORBUtility.dprint("SocketOrChannelConnectionImpl", msg);
1678    }
1679
1680    protected void dprint(String msg, Throwable t)
1681    {
1682        dprint(msg);
1683        t.printStackTrace(System.out);
1684    }
1685}
1686
1687// End of file.
1688