CorbaMessageMediatorImpl.java revision 608:7e06bf1dcb09
1/*
2 * Copyright (c) 2001, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package com.sun.corba.se.impl.protocol;
27
28import java.io.ByteArrayOutputStream;
29import java.io.IOException;
30import java.io.PrintWriter;
31import java.nio.ByteBuffer;
32import java.nio.channels.SelectionKey;
33import java.util.EmptyStackException;
34import java.util.Iterator;
35
36import org.omg.CORBA.Any;
37import org.omg.CORBA.CompletionStatus;
38import org.omg.CORBA.ExceptionList;
39import org.omg.CORBA.INTERNAL;
40import org.omg.CORBA.Principal;
41import org.omg.CORBA.SystemException;
42import org.omg.CORBA.TypeCode;
43import org.omg.CORBA.UnknownUserException;
44import org.omg.CORBA.UNKNOWN;
45import org.omg.CORBA.portable.ResponseHandler;
46import org.omg.CORBA.portable.UnknownException;
47import org.omg.CORBA_2_3.portable.InputStream;
48import org.omg.CORBA_2_3.portable.OutputStream;
49import org.omg.IOP.ExceptionDetailMessage;
50import org.omg.IOP.TAG_RMI_CUSTOM_MAX_STREAM_FORMAT;
51
52import com.sun.corba.se.pept.broker.Broker;
53import com.sun.corba.se.pept.encoding.InputObject;
54import com.sun.corba.se.pept.encoding.OutputObject;
55import com.sun.corba.se.pept.protocol.MessageMediator;
56import com.sun.corba.se.pept.protocol.ProtocolHandler;
57import com.sun.corba.se.pept.transport.ByteBufferPool;
58import com.sun.corba.se.pept.transport.Connection;
59import com.sun.corba.se.pept.transport.ContactInfo;
60import com.sun.corba.se.pept.transport.EventHandler;
61
62import com.sun.corba.se.spi.ior.IOR;
63import com.sun.corba.se.spi.ior.ObjectKey;
64import com.sun.corba.se.spi.ior.ObjectKeyTemplate;
65import com.sun.corba.se.spi.ior.iiop.GIOPVersion;
66import com.sun.corba.se.spi.ior.iiop.IIOPProfileTemplate;
67import com.sun.corba.se.spi.ior.iiop.IIOPProfile;
68import com.sun.corba.se.spi.ior.iiop.MaxStreamFormatVersionComponent;
69import com.sun.corba.se.spi.oa.OAInvocationInfo;
70import com.sun.corba.se.spi.oa.ObjectAdapter;
71import com.sun.corba.se.spi.orb.ORB;
72import com.sun.corba.se.spi.orb.ORBVersionFactory;
73import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
74import com.sun.corba.se.spi.protocol.CorbaProtocolHandler;
75import com.sun.corba.se.spi.protocol.CorbaServerRequestDispatcher;
76import com.sun.corba.se.spi.protocol.ForwardException;
77import com.sun.corba.se.spi.transport.CorbaConnection;
78import com.sun.corba.se.spi.transport.CorbaContactInfo;
79import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;
80import com.sun.corba.se.spi.logging.CORBALogDomains;
81
82import com.sun.corba.se.spi.servicecontext.ORBVersionServiceContext;
83import com.sun.corba.se.spi.servicecontext.ServiceContexts;
84import com.sun.corba.se.spi.servicecontext.UEInfoServiceContext;
85import com.sun.corba.se.spi.servicecontext.MaxStreamFormatVersionServiceContext;
86import com.sun.corba.se.spi.servicecontext.SendingContextServiceContext;
87import com.sun.corba.se.spi.servicecontext.UnknownServiceContext;
88
89import com.sun.corba.se.impl.corba.RequestImpl;
90import com.sun.corba.se.impl.encoding.BufferManagerFactory;
91import com.sun.corba.se.impl.encoding.BufferManagerReadStream;
92import com.sun.corba.se.impl.encoding.CDRInputObject;
93import com.sun.corba.se.impl.encoding.CDROutputObject;
94import com.sun.corba.se.impl.encoding.EncapsOutputStream;
95import com.sun.corba.se.impl.logging.ORBUtilSystemException;
96import com.sun.corba.se.impl.logging.InterceptorsSystemException;
97import com.sun.corba.se.impl.orbutil.ORBConstants;
98import com.sun.corba.se.impl.orbutil.ORBUtility;
99import com.sun.corba.se.impl.ior.iiop.JavaSerializationComponent;
100import com.sun.corba.se.impl.protocol.AddressingDispositionException;
101import com.sun.corba.se.impl.protocol.RequestCanceledException;
102import com.sun.corba.se.impl.protocol.giopmsgheaders.AddressingDispositionHelper;
103import com.sun.corba.se.impl.protocol.giopmsgheaders.CancelRequestMessage;
104import com.sun.corba.se.impl.protocol.giopmsgheaders.FragmentMessage_1_1;
105import com.sun.corba.se.impl.protocol.giopmsgheaders.FragmentMessage_1_2;
106import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateRequestMessage;
107import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateRequestMessage_1_0;
108import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateRequestMessage_1_1;
109import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateRequestMessage_1_2;
110import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyOrReplyMessage;
111import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyMessage;
112import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyMessage_1_0;
113import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyMessage_1_1;
114import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyMessage_1_2;
115import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
116import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase;
117import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageHandler;
118import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage;
119import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage_1_0;
120import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage_1_1;
121import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage_1_2;
122import com.sun.corba.se.impl.protocol.giopmsgheaders.RequestMessage;
123import com.sun.corba.se.impl.protocol.giopmsgheaders.RequestMessage_1_0 ;
124import com.sun.corba.se.impl.protocol.giopmsgheaders.RequestMessage_1_1 ;
125import com.sun.corba.se.impl.protocol.giopmsgheaders.RequestMessage_1_2 ;
126
127// REVISIT: make sure no memory leaks in client/server request/reply maps.
128// REVISIT: normalize requestHeader, replyHeader, messageHeader.
129
130/**
131 * @author Harold Carr
132 */
133public class CorbaMessageMediatorImpl
134    implements
135        CorbaMessageMediator,
136        CorbaProtocolHandler,
137        MessageHandler
138{
139    protected ORB orb;
140    protected ORBUtilSystemException wrapper ;
141    protected InterceptorsSystemException interceptorWrapper ;
142    protected CorbaContactInfo contactInfo;
143    protected CorbaConnection connection;
144    protected short addrDisposition;
145    protected CDROutputObject outputObject;
146    protected CDRInputObject inputObject;
147    protected Message messageHeader;
148    protected RequestMessage requestHeader;
149    protected LocateReplyOrReplyMessage replyHeader;
150    protected String replyExceptionDetailMessage;
151    protected IOR replyIOR;
152    protected Integer requestIdInteger;
153    protected Message dispatchHeader;
154    protected ByteBuffer dispatchByteBuffer;
155    protected byte streamFormatVersion;
156    protected boolean streamFormatVersionSet = false;
157
158    protected org.omg.CORBA.Request diiRequest;
159
160    protected boolean cancelRequestAlreadySent = false;
161
162    protected ProtocolHandler protocolHandler;
163    protected boolean _executeReturnServantInResponseConstructor = false;
164    protected boolean _executeRemoveThreadInfoInResponseConstructor = false;
165    protected boolean _executePIInResponseConstructor = false;
166
167    //
168    // Client-side constructor.
169    //
170
171    public CorbaMessageMediatorImpl(ORB orb,
172                                    ContactInfo contactInfo,
173                                    Connection connection,
174                                    GIOPVersion giopVersion,
175                                    IOR ior,
176                                    int requestId,
177                                    short addrDisposition,
178                                    String operationName,
179                                    boolean isOneWay)
180    {
181        this( orb, connection ) ;
182
183        this.contactInfo = (CorbaContactInfo) contactInfo;
184        this.addrDisposition = addrDisposition;
185
186        streamFormatVersion =
187            getStreamFormatVersionForThisRequest(
188                ((CorbaContactInfo)this.contactInfo).getEffectiveTargetIOR(),
189                giopVersion);
190        streamFormatVersionSet = true;
191
192        requestHeader = (RequestMessage) MessageBase.createRequest(
193            this.orb,
194            giopVersion,
195            ORBUtility.getEncodingVersion(orb, ior),
196            requestId,
197            !isOneWay,
198            ((CorbaContactInfo)this.contactInfo).getEffectiveTargetIOR(),
199            this.addrDisposition,
200            operationName,
201            new ServiceContexts(orb),
202            null);
203    }
204
205    //
206    // Acceptor constructor.
207    //
208
209    public CorbaMessageMediatorImpl(ORB orb,
210                                    Connection connection)
211    {
212        this.orb = orb;
213        this.connection = (CorbaConnection)connection;
214        this.wrapper = ORBUtilSystemException.get( orb,
215            CORBALogDomains.RPC_PROTOCOL ) ;
216        this.interceptorWrapper = InterceptorsSystemException.get( orb,
217            CORBALogDomains.RPC_PROTOCOL ) ;
218    }
219
220    //
221    // Dispatcher constructor.
222    //
223
224    // Note: in some cases (e.g., a reply message) this message
225    // mediator will only be used for dispatch.  Then the original
226    // request side mediator will take over.
227    public CorbaMessageMediatorImpl(ORB orb,
228                                    CorbaConnection connection,
229                                    Message dispatchHeader,
230                                    ByteBuffer byteBuffer)
231    {
232        this( orb, connection ) ;
233        this.dispatchHeader = dispatchHeader;
234        this.dispatchByteBuffer = byteBuffer;
235    }
236
237    ////////////////////////////////////////////////////
238    //
239    // MessageMediator
240    //
241
242    public Broker getBroker()
243    {
244        return orb;
245    }
246
247    public ContactInfo getContactInfo()
248    {
249        return contactInfo;
250    }
251
252    public Connection getConnection()
253    {
254        return connection;
255    }
256
257    public void initializeMessage()
258    {
259        getRequestHeader().write(outputObject);
260    }
261
262    public void finishSendingRequest()
263    {
264        // REVISIT: probably move logic in outputObject to here.
265        outputObject.finishSendingMessage();
266    }
267
268    public InputObject waitForResponse()
269    {
270        if (getRequestHeader().isResponseExpected()) {
271            return connection.waitForResponse(this);
272        }
273        return null;
274    }
275
276    public void setOutputObject(OutputObject outputObject)
277    {
278        this.outputObject = (CDROutputObject) outputObject;
279    }
280
281    public OutputObject getOutputObject()
282    {
283        return outputObject;
284    }
285
286    public void setInputObject(InputObject inputObject)
287    {
288        this.inputObject = (CDRInputObject) inputObject;
289    }
290
291    public InputObject getInputObject()
292    {
293        return inputObject;
294    }
295
296    ////////////////////////////////////////////////////
297    //
298    // CorbaMessageMediator
299    //
300
301    public void setReplyHeader(LocateReplyOrReplyMessage header)
302    {
303        this.replyHeader = header;
304        this.replyIOR = header.getIOR(); // REVISIT - need separate field?
305    }
306
307    public LocateReplyMessage getLocateReplyHeader()
308    {
309        return (LocateReplyMessage) replyHeader;
310    }
311
312    public ReplyMessage getReplyHeader()
313    {
314        return (ReplyMessage) replyHeader;
315    }
316
317    public void setReplyExceptionDetailMessage(String message)
318    {
319        replyExceptionDetailMessage = message;
320    }
321
322    public RequestMessage getRequestHeader()
323    {
324        return requestHeader;
325    }
326
327    public GIOPVersion getGIOPVersion()
328    {
329        if (messageHeader != null) {
330            return messageHeader.getGIOPVersion();
331        }
332        return getRequestHeader().getGIOPVersion();
333    }
334
335    public byte getEncodingVersion() {
336        if (messageHeader != null) {
337            return messageHeader.getEncodingVersion();
338        }
339        return getRequestHeader().getEncodingVersion();
340    }
341
342    public int getRequestId()
343    {
344        return getRequestHeader().getRequestId();
345    }
346
347    public Integer getRequestIdInteger()
348    {
349        if (requestIdInteger == null) {
350            requestIdInteger = new Integer(getRequestHeader().getRequestId());
351        }
352        return requestIdInteger;
353    }
354
355    public boolean isOneWay()
356    {
357        return ! getRequestHeader().isResponseExpected();
358    }
359
360    public short getAddrDisposition()
361    {
362        return addrDisposition;
363    }
364
365    public String getOperationName()
366    {
367        return getRequestHeader().getOperation();
368    }
369
370    public ServiceContexts getRequestServiceContexts()
371    {
372        return getRequestHeader().getServiceContexts();
373    }
374
375    public ServiceContexts getReplyServiceContexts()
376    {
377        return getReplyHeader().getServiceContexts();
378    }
379
380    public void sendCancelRequestIfFinalFragmentNotSent()
381    {
382        if ((!sentFullMessage()) && sentFragment() &&
383            (!cancelRequestAlreadySent))
384        {
385            try {
386                if (orb.subcontractDebugFlag) {
387                    dprint(".sendCancelRequestIfFinalFragmentNotSent->: "
388                           + opAndId(this));
389                }
390                connection.sendCancelRequestWithLock(getGIOPVersion(),
391                                                     getRequestId());
392                // Case: first a location forward, then a marshaling
393                // exception (e.g., non-serializable object).  Only
394                // send cancel once.
395                cancelRequestAlreadySent = true;
396            } catch (IOException e) {
397                if (orb.subcontractDebugFlag) {
398                    dprint(".sendCancelRequestIfFinalFragmentNotSent: !ERROR : " + opAndId(this),
399                           e);
400                }
401
402                // REVISIT: we could attempt to send a final incomplete
403                // fragment in this case.
404                throw interceptorWrapper.ioexceptionDuringCancelRequest(
405                    CompletionStatus.COMPLETED_MAYBE, e );
406            } finally {
407                if (orb.subcontractDebugFlag) {
408                    dprint(".sendCancelRequestIfFinalFragmentNotSent<-: "
409                           + opAndId(this));
410                }
411            }
412        }
413    }
414
415    public boolean sentFullMessage()
416    {
417        return outputObject.getBufferManager().sentFullMessage();
418    }
419
420    public boolean sentFragment()
421    {
422        return outputObject.getBufferManager().sentFragment();
423    }
424
425    public void setDIIInfo(org.omg.CORBA.Request diiRequest)
426    {
427        this.diiRequest = diiRequest;
428    }
429
430    public boolean isDIIRequest()
431    {
432        return diiRequest != null;
433    }
434
435    public Exception unmarshalDIIUserException(String repoId, InputStream is)
436    {
437        if (! isDIIRequest()) {
438            return null;
439        }
440
441        ExceptionList _exceptions = diiRequest.exceptions();
442
443        try {
444            // Find the typecode for the exception
445            for (int i=0; i<_exceptions.count() ; i++) {
446                TypeCode tc = _exceptions.item(i);
447                if ( tc.id().equals(repoId) ) {
448                    // Since we dont have the actual user exception
449                    // class, the spec says we have to create an
450                    // UnknownUserException and put it in the
451                    // environment.
452                    Any eany = orb.create_any();
453                    eany.read_value(is, (TypeCode)tc);
454
455                    return new UnknownUserException(eany);
456                }
457            }
458        } catch (Exception b) {
459            throw wrapper.unexpectedDiiException(b);
460        }
461
462        // must be a truly unknown exception
463        return wrapper.unknownCorbaExc( CompletionStatus.COMPLETED_MAYBE);
464    }
465
466    public void setDIIException(Exception exception)
467    {
468        diiRequest.env().exception(exception);
469    }
470
471    public void handleDIIReply(InputStream inputStream)
472    {
473        if (! isDIIRequest()) {
474            return;
475        }
476        ((RequestImpl)diiRequest).unmarshalReply(inputStream);
477    }
478
479    public Message getDispatchHeader()
480    {
481        return dispatchHeader;
482    }
483
484    public void setDispatchHeader(Message msg)
485    {
486        dispatchHeader = msg;
487    }
488
489    public ByteBuffer getDispatchBuffer()
490    {
491        return dispatchByteBuffer;
492    }
493
494    public void setDispatchBuffer(ByteBuffer byteBuffer)
495    {
496        dispatchByteBuffer = byteBuffer;
497    }
498
499    public int getThreadPoolToUse() {
500        int poolToUse = 0;
501        Message msg = getDispatchHeader();
502        // A null msg should never happen. But, we'll be
503        // defensive just in case.
504        if (msg != null) {
505            poolToUse = msg.getThreadPoolToUse();
506        }
507        return poolToUse;
508    }
509
510    public byte getStreamFormatVersion()
511    {
512        // REVISIT: ContactInfo/Acceptor output object factories
513        // just use this.  Maybe need to distinguish:
514        //    createOutputObjectForRequest
515        //    createOutputObjectForReply
516        // then do getStreamFormatVersionForRequest/ForReply here.
517        if (streamFormatVersionSet) {
518            return streamFormatVersion;
519        }
520        return getStreamFormatVersionForReply();
521    }
522
523    /**
524     * If the RMI-IIOP maximum stream format version service context
525     * is present, it indicates the maximum stream format version we
526     * could use for the reply.  If it isn't present, the default is
527     * 2 for GIOP 1.3 or greater, 1 for lower.
528     *
529     * This is only sent on requests.  Clients can find out the
530     * server's maximum by looking for a tagged component in the IOR.
531     */
532    public byte getStreamFormatVersionForReply() {
533
534        // NOTE: The request service contexts may indicate the max.
535        ServiceContexts svc = getRequestServiceContexts();
536
537        MaxStreamFormatVersionServiceContext msfvsc
538            = (MaxStreamFormatVersionServiceContext)svc.get(
539                MaxStreamFormatVersionServiceContext.SERVICE_CONTEXT_ID);
540
541        if (msfvsc != null) {
542            byte localMaxVersion = ORBUtility.getMaxStreamFormatVersion();
543            byte remoteMaxVersion = msfvsc.getMaximumStreamFormatVersion();
544
545            return (byte)Math.min(localMaxVersion, remoteMaxVersion);
546        } else {
547            // Defaults to 1 for GIOP 1.2 or less, 2 for
548            // GIOP 1.3 or higher.
549            if (getGIOPVersion().lessThan(GIOPVersion.V1_3))
550                return ORBConstants.STREAM_FORMAT_VERSION_1;
551            else
552                return ORBConstants.STREAM_FORMAT_VERSION_2;
553        }
554    }
555
556    public boolean isSystemExceptionReply()
557    {
558        return replyHeader.getReplyStatus() == ReplyMessage.SYSTEM_EXCEPTION;
559    }
560
561    public boolean isUserExceptionReply()
562    {
563        return replyHeader.getReplyStatus() == ReplyMessage.USER_EXCEPTION;
564    }
565
566    public boolean isLocationForwardReply()
567    {
568        return ( (replyHeader.getReplyStatus() == ReplyMessage.LOCATION_FORWARD) ||
569                 (replyHeader.getReplyStatus() == ReplyMessage.LOCATION_FORWARD_PERM) );
570        //return replyHeader.getReplyStatus() == ReplyMessage.LOCATION_FORWARD;
571    }
572
573    public boolean isDifferentAddrDispositionRequestedReply()
574    {
575        return replyHeader.getReplyStatus() == ReplyMessage.NEEDS_ADDRESSING_MODE;
576    }
577
578    public short getAddrDispositionReply()
579    {
580        return replyHeader.getAddrDisposition();
581    }
582
583    public IOR getForwardedIOR()
584    {
585        return replyHeader.getIOR();
586    }
587
588    public SystemException getSystemExceptionReply()
589    {
590        return replyHeader.getSystemException(replyExceptionDetailMessage);
591    }
592
593    ////////////////////////////////////////////////////
594    //
595    // Used by server side.
596    //
597
598    public ObjectKey getObjectKey()
599    {
600        return getRequestHeader().getObjectKey();
601    }
602
603    public void setProtocolHandler(CorbaProtocolHandler protocolHandler)
604    {
605        throw wrapper.methodShouldNotBeCalled() ;
606    }
607
608    public CorbaProtocolHandler getProtocolHandler()
609    {
610        // REVISIT: should look up in orb registry.
611        return this;
612    }
613
614    ////////////////////////////////////////////////////
615    //
616    // ResponseHandler
617    //
618
619    public org.omg.CORBA.portable.OutputStream createReply()
620    {
621        // Note: relies on side-effect of setting mediator output field.
622        // REVISIT - cast - need interface
623        getProtocolHandler().createResponse(this, (ServiceContexts) null);
624        return (OutputStream) getOutputObject();
625    }
626
627    public org.omg.CORBA.portable.OutputStream createExceptionReply()
628    {
629        // Note: relies on side-effect of setting mediator output field.
630        // REVISIT - cast - need interface
631        getProtocolHandler().createUserExceptionResponse(this, (ServiceContexts) null);
632        return (OutputStream) getOutputObject();
633    }
634
635    public boolean executeReturnServantInResponseConstructor()
636    {
637        return _executeReturnServantInResponseConstructor;
638
639    }
640
641    public void setExecuteReturnServantInResponseConstructor(boolean b)
642    {
643        _executeReturnServantInResponseConstructor = b;
644    }
645
646    public boolean executeRemoveThreadInfoInResponseConstructor()
647    {
648        return _executeRemoveThreadInfoInResponseConstructor;
649    }
650
651    public void setExecuteRemoveThreadInfoInResponseConstructor(boolean b)
652    {
653        _executeRemoveThreadInfoInResponseConstructor = b;
654    }
655
656    public boolean executePIInResponseConstructor()
657    {
658        return _executePIInResponseConstructor;
659    }
660
661    public void setExecutePIInResponseConstructor( boolean b )
662    {
663        _executePIInResponseConstructor = b;
664    }
665
666    private byte getStreamFormatVersionForThisRequest(IOR ior,
667                                                      GIOPVersion giopVersion)
668    {
669
670        byte localMaxVersion
671            = ORBUtility.getMaxStreamFormatVersion();
672
673        IOR effectiveTargetIOR =
674            ((CorbaContactInfo)this.contactInfo).getEffectiveTargetIOR();
675        IIOPProfileTemplate temp =
676            (IIOPProfileTemplate)effectiveTargetIOR.getProfile().getTaggedProfileTemplate();
677        Iterator iter = temp.iteratorById(TAG_RMI_CUSTOM_MAX_STREAM_FORMAT.value);
678        if (!iter.hasNext()) {
679            // Didn't have the max stream format version tagged
680            // component.
681            if (giopVersion.lessThan(GIOPVersion.V1_3))
682                return ORBConstants.STREAM_FORMAT_VERSION_1;
683            else
684                return ORBConstants.STREAM_FORMAT_VERSION_2;
685        }
686
687        byte remoteMaxVersion
688            = ((MaxStreamFormatVersionComponent)iter.next()).getMaxStreamFormatVersion();
689
690        return (byte)Math.min(localMaxVersion, remoteMaxVersion);
691    }
692
693    ////////////////////////////////////////////////////////////////////////
694    ////////////////////////////////////////////////////////////////////////
695    ////////////////////////////////////////////////////////////////////////
696
697    // REVISIT - This could be a separate implementation object looked
698    // up in a registry.  However it needs some state in the message
699    // mediator so combine for now.
700
701
702    protected boolean isThreadDone = false;
703
704    ////////////////////////////////////////////////////
705    //
706    // pept.protocol.ProtocolHandler
707    //
708
709    public boolean handleRequest(MessageMediator messageMediator)
710    {
711        try {
712            dispatchHeader.callback(this);
713        } catch (IOException e) {
714            // REVISIT - this should be handled internally.
715            ;
716        }
717        return isThreadDone;
718    }
719
720    ////////////////////////////////////////////////////
721    //
722    // iiop.messages.MessageHandler
723    //
724
725    private void setWorkThenPoolOrResumeSelect(Message header)
726    {
727        if (getConnection().getEventHandler().shouldUseSelectThreadToWait()) {
728            resumeSelect(header);
729        } else {
730            // Leader/Follower when using reader thread.
731            // When this thread is done working it will go back in pool.
732
733            isThreadDone = true;
734
735            // First unregister current registration.
736            orb.getTransportManager().getSelector(0)
737                .unregisterForEvent(getConnection().getEventHandler());
738            // Have another thread become the reader.
739            orb.getTransportManager().getSelector(0)
740                .registerForEvent(getConnection().getEventHandler());
741        }
742    }
743
744    private void setWorkThenReadOrResumeSelect(Message header)
745    {
746        if (getConnection().getEventHandler().shouldUseSelectThreadToWait()) {
747            resumeSelect(header);
748        } else {
749            // When using reader thread then wen this thread is
750            // done working it will continue reading.
751            isThreadDone = false;
752        }
753    }
754
755    private void resumeSelect(Message header)
756    {
757        // NOTE: VERY IMPORTANT:
758        // Only participate in select after getting to the point
759        // that proper serialization of fragments is ensured.
760
761        if (transportDebug()) {
762            dprint(".resumeSelect:->");
763            // REVISIT: not-OO:
764            String requestId = "?";
765            if (header instanceof RequestMessage) {
766                requestId =
767                    new Integer(((RequestMessage)header)
768                                .getRequestId()).toString();
769            } else if (header instanceof ReplyMessage) {
770                requestId =
771                    new Integer(((ReplyMessage)header)
772                                .getRequestId()).toString();
773            } else if (header instanceof FragmentMessage_1_2) {
774                requestId =
775                    new Integer(((FragmentMessage_1_2)header)
776                                .getRequestId()).toString();
777            }
778            dprint(".resumeSelect: id/"
779                   + requestId
780                   + " " + getConnection()
781                   );
782
783        }
784
785        // IMPORTANT: To avoid bug (4953599), we force the Thread that does the NIO select
786        // to also do the enable/disable of Ops using SelectionKey.interestOps(Ops of Interest).
787        // Otherwise, the SelectionKey.interestOps(Ops of Interest) may block indefinitely in
788        // this thread.
789        EventHandler eventHandler = getConnection().getEventHandler();
790        orb.getTransportManager().getSelector(0).registerInterestOps(eventHandler);
791
792        if (transportDebug()) {
793            dprint(".resumeSelect:<-");
794        }
795    }
796
797    private void setInputObject()
798    {
799        // REVISIT: refactor createInputObject (and createMessageMediator)
800        // into base PlugInFactory.  Get via connection (either ContactInfo
801        // or Acceptor).
802        if (getConnection().getContactInfo() != null) {
803            inputObject = (CDRInputObject)
804                getConnection().getContactInfo()
805                .createInputObject(orb, this);
806        } else if (getConnection().getAcceptor() != null) {
807            inputObject = (CDRInputObject)
808                getConnection().getAcceptor()
809                .createInputObject(orb, this);
810        } else {
811            throw new RuntimeException("CorbaMessageMediatorImpl.setInputObject");
812        }
813        inputObject.setMessageMediator(this);
814        setInputObject(inputObject);
815    }
816
817    private void signalResponseReceived()
818    {
819        // This will end up using the MessageMediator associated with
820        // the original request instead of the current mediator (which
821        // need to be constructed to hold the dispatchBuffer and connection).
822        connection.getResponseWaitingRoom()
823            .responseReceived((InputObject)inputObject);
824    }
825
826    // This handles message types for which we don't create classes.
827    public void handleInput(Message header) throws IOException
828    {
829        try {
830            messageHeader = header;
831
832            if (transportDebug())
833                dprint(".handleInput->: "
834                       + MessageBase.typeToString(header.getType()));
835
836            setWorkThenReadOrResumeSelect(header);
837
838            switch(header.getType())
839            {
840            case Message.GIOPCloseConnection:
841                if (transportDebug()) {
842                    dprint(".handleInput: CloseConnection: purging");
843                }
844                connection.purgeCalls(wrapper.connectionRebind(), true, false);
845                break;
846            case Message.GIOPMessageError:
847                if (transportDebug()) {
848                    dprint(".handleInput: MessageError: purging");
849                }
850                connection.purgeCalls(wrapper.recvMsgError(), true, false);
851                break;
852            default:
853                if (transportDebug()) {
854                    dprint(".handleInput: ERROR: "
855                           + MessageBase.typeToString(header.getType()));
856                }
857                throw wrapper.badGiopRequestType() ;
858            }
859            releaseByteBufferToPool();
860        } finally {
861            if (transportDebug()) {
862                dprint(".handleInput<-: "
863                       + MessageBase.typeToString(header.getType()));
864            }
865        }
866    }
867
868    public void handleInput(RequestMessage_1_0 header) throws IOException
869    {
870        try {
871            if (transportDebug()) dprint(".REQUEST 1.0->: " + header);
872            try {
873                messageHeader = requestHeader = (RequestMessage) header;
874                setInputObject();
875            } finally {
876                setWorkThenPoolOrResumeSelect(header);
877            }
878            getProtocolHandler().handleRequest(header, this);
879        } catch (Throwable t) {
880            if (transportDebug())
881                dprint(".REQUEST 1.0: !!ERROR!!: " + header, t);
882            // Mask the exception from thread.;
883        } finally {
884            if (transportDebug()) dprint(".REQUEST 1.0<-: " + header);
885        }
886    }
887
888    public void handleInput(RequestMessage_1_1 header) throws IOException
889    {
890        try {
891            if (transportDebug()) dprint(".REQUEST 1.1->: " + header);
892            try {
893                messageHeader = requestHeader = (RequestMessage) header;
894                setInputObject();
895                connection.serverRequest_1_1_Put(this);
896            } finally {
897                setWorkThenPoolOrResumeSelect(header);
898            }
899            getProtocolHandler().handleRequest(header, this);
900        } catch (Throwable t) {
901            if (transportDebug())
902                dprint(".REQUEST 1.1: !!ERROR!!: " + header, t);
903            // Mask the exception from thread.;
904        } finally {
905            if (transportDebug()) dprint(".REQUEST 1.1<-: " + header);
906        }
907    }
908
909    // REVISIT: this is identical to 1_0 except for fragment part.
910    public void handleInput(RequestMessage_1_2 header) throws IOException
911    {
912        try {
913            try {
914
915                messageHeader = requestHeader = (RequestMessage) header;
916
917                header.unmarshalRequestID(dispatchByteBuffer);
918                setInputObject();
919
920                if (transportDebug()) dprint(".REQUEST 1.2->: id/"
921                                             + header.getRequestId()
922                                             + ": "
923                                             + header);
924
925                // NOTE: in the old code this used to be done conditionally:
926                // if (header.moreFragmentsToFollow()).
927                // Now we always put it in. We take it out when
928                // the response is done.
929                // This must happen now so if a header is fragmented the stream
930                // may be found.
931                connection.serverRequestMapPut(header.getRequestId(), this);
932            } finally {
933                // Leader/Follower.
934                // Note: This *MUST* come after putting stream in above map
935                // since the header may be fragmented and you do not want to
936                // start reading again until the map above is set.
937                setWorkThenPoolOrResumeSelect(header);
938            }
939            //inputObject.unmarshalHeader(); // done in subcontract.
940            getProtocolHandler().handleRequest(header, this);
941        } catch (Throwable t) {
942            if (transportDebug()) dprint(".REQUEST 1.2: id/"
943                                         + header.getRequestId()
944                                         + ": !!ERROR!!: "
945                                         + header,
946                                         t);
947            // Mask the exception from thread.;
948        } finally {
949            connection.serverRequestMapRemove(header.getRequestId());
950
951            if (transportDebug()) dprint(".REQUEST 1.2<-: id/"
952                                         + header.getRequestId()
953                                         + ": "
954                                         + header);
955        }
956    }
957
958    public void handleInput(ReplyMessage_1_0 header) throws IOException
959    {
960        try {
961            try {
962                if (transportDebug()) dprint(".REPLY 1.0->: " + header);
963                messageHeader = replyHeader = (ReplyMessage) header;
964                setInputObject();
965
966                // REVISIT: this should be done by waiting thread.
967                inputObject.unmarshalHeader();
968
969                signalResponseReceived();
970            } finally{
971                setWorkThenReadOrResumeSelect(header);
972            }
973        } catch (Throwable t) {
974            if (transportDebug())dprint(".REPLY 1.0: !!ERROR!!: " + header, t);
975            // Mask the exception from thread.;
976        } finally {
977            if (transportDebug()) dprint(".REPLY 1.0<-: " + header);
978        }
979    }
980
981    public void handleInput(ReplyMessage_1_1 header) throws IOException
982    {
983        try {
984            if (transportDebug()) dprint(".REPLY 1.1->: " + header);
985            messageHeader = replyHeader = (ReplyMessage) header;
986            setInputObject();
987
988            if (header.moreFragmentsToFollow()) {
989
990                // More fragments are coming to complete this reply, so keep
991                // a reference to the InputStream so we can add the fragments
992                connection.clientReply_1_1_Put(this);
993
994                // In 1.1, we can't assume that we have the request ID in the
995                // first fragment.  Thus, another thread is used
996                // to be the reader while this thread unmarshals
997                // the extended header and wakes up the client thread.
998                setWorkThenPoolOrResumeSelect(header);
999
1000                // REVISIT - error handling.
1001                // This must be done now.
1002                inputObject.unmarshalHeader();
1003
1004                signalResponseReceived();
1005
1006            } else {
1007
1008                // Not fragmented, therefore we know the request
1009                // ID is here.  Thus, we can unmarshal the extended header
1010                // and wake up the client thread without using a third
1011                // thread as above.
1012
1013                // REVISIT - error handling during unmarshal.
1014                // This must be done now to get the request id.
1015                inputObject.unmarshalHeader();
1016
1017                signalResponseReceived();
1018
1019                setWorkThenReadOrResumeSelect(header);
1020            }
1021        } catch (Throwable t) {
1022            if (transportDebug()) dprint(".REPLY 1.1: !!ERROR!!: " + header);
1023            // Mask the exception from thread.;
1024        } finally {
1025            if (transportDebug()) dprint(".REPLY 1.1<-: " + header);
1026        }
1027    }
1028
1029    public void handleInput(ReplyMessage_1_2 header) throws IOException
1030    {
1031        try {
1032            try {
1033                messageHeader = replyHeader = (ReplyMessage) header;
1034
1035                // We know that the request ID is in the first fragment
1036                header.unmarshalRequestID(dispatchByteBuffer);
1037
1038                if (transportDebug()) {
1039                    dprint(".REPLY 1.2->: id/"
1040                           + + header.getRequestId()
1041                           + ": more?: " + header.moreFragmentsToFollow()
1042                           + ": " + header);
1043                }
1044
1045                setInputObject();
1046
1047                signalResponseReceived();
1048            } finally {
1049                setWorkThenReadOrResumeSelect(header);
1050            }
1051        } catch (Throwable t) {
1052            if (transportDebug()) dprint(".REPLY 1.2: id/"
1053                                         + header.getRequestId()
1054                                         + ": !!ERROR!!: "
1055                                         + header, t);
1056            // Mask the exception from thread.;
1057        } finally {
1058            if (transportDebug()) dprint(".REPLY 1.2<-: id/"
1059                                         + header.getRequestId()
1060                                         + ": "
1061                                         + header);
1062        }
1063    }
1064
1065    public void handleInput(LocateRequestMessage_1_0 header) throws IOException
1066    {
1067        try {
1068            if (transportDebug())
1069                dprint(".LOCATE_REQUEST 1.0->: " + header);
1070            try {
1071                messageHeader = header;
1072                setInputObject();
1073            } finally {
1074                setWorkThenPoolOrResumeSelect(header);
1075            }
1076            getProtocolHandler().handleRequest(header, this);
1077        } catch (Throwable t) {
1078            if (transportDebug())
1079                dprint(".LOCATE_REQUEST 1.0: !!ERROR!!: " + header, t);
1080            // Mask the exception from thread.;
1081        } finally {
1082            if (transportDebug())
1083                dprint(".LOCATE_REQUEST 1.0<-: " + header);
1084        }
1085
1086    }
1087
1088    public void handleInput(LocateRequestMessage_1_1 header) throws IOException
1089    {
1090        try {
1091            if (transportDebug())
1092                dprint(".LOCATE_REQUEST 1.1->: " + header);
1093            try {
1094                messageHeader = header;
1095                setInputObject();
1096            } finally {
1097                setWorkThenPoolOrResumeSelect(header);
1098            }
1099            getProtocolHandler().handleRequest(header, this);
1100        } catch (Throwable t) {
1101            if (transportDebug())
1102                dprint(".LOCATE_REQUEST 1.1: !!ERROR!!: " + header, t);
1103            // Mask the exception from thread.;
1104        } finally {
1105            if (transportDebug())
1106                dprint(".LOCATE_REQUEST 1.1<-:" + header);
1107        }
1108    }
1109
1110    public void handleInput(LocateRequestMessage_1_2 header) throws IOException
1111    {
1112        try {
1113            try {
1114                messageHeader = header;
1115
1116                header.unmarshalRequestID(dispatchByteBuffer);
1117                setInputObject();
1118
1119                if (transportDebug())
1120                    dprint(".LOCATE_REQUEST 1.2->: id/"
1121                           + header.getRequestId()
1122                           + ": "
1123                           + header);
1124
1125                if (header.moreFragmentsToFollow()) {
1126                    connection.serverRequestMapPut(header.getRequestId(),this);
1127                }
1128            } finally {
1129                setWorkThenPoolOrResumeSelect(header);
1130            }
1131            getProtocolHandler().handleRequest(header, this);
1132        } catch (Throwable t) {
1133            if (transportDebug())
1134                dprint(".LOCATE_REQUEST 1.2: id/"
1135                       + header.getRequestId()
1136                       + ": !!ERROR!!: "
1137                       + header, t);
1138            // Mask the exception from thread.;
1139        } finally {
1140            if (transportDebug())
1141                dprint(".LOCATE_REQUEST 1.2<-: id/"
1142                       + header.getRequestId()
1143                       + ": "
1144                       + header);
1145        }
1146    }
1147
1148    public void handleInput(LocateReplyMessage_1_0 header) throws IOException
1149    {
1150        try {
1151            if (transportDebug())
1152                dprint(".LOCATE_REPLY 1.0->:" + header);
1153            try {
1154                messageHeader = header;
1155                setInputObject();
1156                inputObject.unmarshalHeader(); // REVISIT Put in subcontract.
1157                signalResponseReceived();
1158            } finally {
1159                setWorkThenReadOrResumeSelect(header);
1160            }
1161        } catch (Throwable t) {
1162            if (transportDebug())
1163                dprint(".LOCATE_REPLY 1.0: !!ERROR!!: " + header, t);
1164            // Mask the exception from thread.;
1165        } finally {
1166            if (transportDebug())
1167                dprint(".LOCATE_REPLY 1.0<-: " + header);
1168        }
1169    }
1170
1171    public void handleInput(LocateReplyMessage_1_1 header) throws IOException
1172    {
1173        try {
1174            if (transportDebug()) dprint(".LOCATE_REPLY 1.1->: " + header);
1175            try {
1176                messageHeader = header;
1177                setInputObject();
1178                // Fragmented LocateReplies are not allowed in 1.1.
1179                inputObject.unmarshalHeader();
1180                signalResponseReceived();
1181            } finally {
1182                setWorkThenReadOrResumeSelect(header);
1183            }
1184        } catch (Throwable t) {
1185            if (transportDebug())
1186                dprint(".LOCATE_REPLY 1.1: !!ERROR!!: " + header, t);
1187            // Mask the exception from thread.;
1188        } finally {
1189            if (transportDebug()) dprint(".LOCATE_REPLY 1.1<-: " + header);
1190        }
1191    }
1192
1193    public void handleInput(LocateReplyMessage_1_2 header) throws IOException
1194    {
1195        try {
1196            try {
1197                messageHeader = header;
1198
1199                // No need to put in client reply map - already there.
1200                header.unmarshalRequestID(dispatchByteBuffer);
1201
1202                setInputObject();
1203
1204                if (transportDebug()) dprint(".LOCATE_REPLY 1.2->: id/"
1205                                             + header.getRequestId()
1206                                             + ": "
1207                                             + header);
1208
1209                signalResponseReceived();
1210            } finally {
1211                setWorkThenPoolOrResumeSelect(header); // REVISIT
1212            }
1213        } catch (Throwable t) {
1214            if (transportDebug())
1215                dprint(".LOCATE_REPLY 1.2: id/"
1216                       + header.getRequestId()
1217                       + ": !!ERROR!!: "
1218                       + header, t);
1219            // Mask the exception from thread.;
1220        } finally {
1221            if (transportDebug()) dprint(".LOCATE_REPLY 1.2<-: id/"
1222                                         + header.getRequestId()
1223                                         + ": "
1224                                         + header);
1225        }
1226    }
1227
1228    public void handleInput(FragmentMessage_1_1 header) throws IOException
1229    {
1230        try {
1231            if (transportDebug()) {
1232                dprint(".FRAGMENT 1.1->: "
1233                       + "more?: " + header.moreFragmentsToFollow()
1234                       + ": " + header);
1235            }
1236            try {
1237                messageHeader = header;
1238                MessageMediator mediator = null;
1239                CDRInputObject inputObject = null;
1240
1241                if (connection.isServer()) {
1242                    mediator = connection.serverRequest_1_1_Get();
1243                } else {
1244                    mediator = connection.clientReply_1_1_Get();
1245                }
1246                if (mediator != null) {
1247                    inputObject = (CDRInputObject) mediator.getInputObject();
1248                }
1249
1250                // If no input stream available, then discard the fragment.
1251                // This can happen:
1252                // 1. if a fragment message is received prior to receiving
1253                //    the original request/reply message. Very unlikely.
1254                // 2. if a fragment message is received after the
1255                //    reply has been sent (early replies)
1256                // Note: In the case of early replies, the fragments received
1257                // during the request processing (which are never unmarshaled),
1258                // will eventually be discarded by the GC.
1259                if (inputObject == null) {
1260                    if (transportDebug())
1261                        dprint(".FRAGMENT 1.1: ++++DISCARDING++++: " + header);
1262                    // need to release dispatchByteBuffer to pool if
1263                    // we are discarding
1264                    releaseByteBufferToPool();
1265                    return;
1266                }
1267
1268                inputObject.getBufferManager()
1269                    .processFragment(dispatchByteBuffer, header);
1270
1271                if (! header.moreFragmentsToFollow()) {
1272                    if (connection.isServer()) {
1273                        connection.serverRequest_1_1_Remove();
1274                    } else {
1275                        connection.clientReply_1_1_Remove();
1276                    }
1277                }
1278            } finally {
1279                // NOTE: This *must* come after queing the fragment
1280                // when using the selector to ensure fragments stay in order.
1281                setWorkThenReadOrResumeSelect(header);
1282            }
1283        } catch (Throwable t) {
1284            if (transportDebug())
1285                dprint(".FRAGMENT 1.1: !!ERROR!!: " + header, t);
1286            // Mask the exception from thread.;
1287        } finally {
1288            if (transportDebug()) dprint(".FRAGMENT 1.1<-: " + header);
1289        }
1290    }
1291
1292    public void handleInput(FragmentMessage_1_2 header) throws IOException
1293    {
1294        try {
1295            try {
1296                messageHeader = header;
1297
1298                // Note:  We know it's a 1.2 fragment, we have the data, but
1299                // we need the IIOPInputStream instance to unmarshal the
1300                // request ID... but we need the request ID to get the
1301                // IIOPInputStream instance. So we peek at the raw bytes.
1302
1303                header.unmarshalRequestID(dispatchByteBuffer);
1304
1305                if (transportDebug()) {
1306                    dprint(".FRAGMENT 1.2->: id/"
1307                           + header.getRequestId()
1308                           + ": more?: " + header.moreFragmentsToFollow()
1309                           + ": " + header);
1310                }
1311
1312                MessageMediator mediator = null;
1313                InputObject inputObject = null;
1314
1315                if (connection.isServer()) {
1316                    mediator =
1317                        connection.serverRequestMapGet(header.getRequestId());
1318                } else {
1319                    mediator =
1320                        connection.clientRequestMapGet(header.getRequestId());
1321                }
1322                if (mediator != null) {
1323                    inputObject = mediator.getInputObject();
1324                }
1325                // See 1.1 comments.
1326                if (inputObject == null) {
1327                    if (transportDebug()) {
1328                        dprint(".FRAGMENT 1.2: id/"
1329                               + header.getRequestId()
1330                               + ": ++++DISCARDING++++: "
1331                               + header);
1332                    }
1333                    // need to release dispatchByteBuffer to pool if
1334                    // we are discarding
1335                    releaseByteBufferToPool();
1336                    return;
1337                }
1338                ((CDRInputObject)inputObject)
1339                    .getBufferManager().processFragment(
1340                                     dispatchByteBuffer, header);
1341
1342                // REVISIT: but if it is a server don't you have to remove the
1343                // stream from the map?
1344                if (! connection.isServer()) {
1345                    /* REVISIT
1346                     * No need to do anything.
1347                     * Should we mark that last was received?
1348                     if (! header.moreFragmentsToFollow()) {
1349                     // Last fragment.
1350                     }
1351                    */
1352                }
1353            } finally {
1354                // NOTE: This *must* come after queing the fragment
1355                // when using the selector to ensure fragments stay in order.
1356                setWorkThenReadOrResumeSelect(header);
1357            }
1358        } catch (Throwable t) {
1359            if (transportDebug())
1360                dprint(".FRAGMENT 1.2: id/"
1361                       + header.getRequestId()
1362                       + ": !!ERROR!!: "
1363                       + header, t);
1364            // Mask the exception from thread.;
1365        } finally {
1366            if (transportDebug()) dprint(".FRAGMENT 1.2<-: id/"
1367                                         + header.getRequestId()
1368                                         + ": "
1369                                         + header);
1370        }
1371    }
1372
1373    public void handleInput(CancelRequestMessage header) throws IOException
1374    {
1375        try {
1376            try {
1377                messageHeader = header;
1378                setInputObject();
1379
1380                // REVISIT: Move these two to subcontract.
1381                inputObject.unmarshalHeader();
1382
1383                if (transportDebug()) dprint(".CANCEL->: id/"
1384                                             + header.getRequestId() + ": "
1385                                             + header.getGIOPVersion() + ": "
1386                                             + header);
1387
1388                processCancelRequest(header.getRequestId());
1389                releaseByteBufferToPool();
1390            } finally {
1391                setWorkThenReadOrResumeSelect(header);
1392            }
1393        } catch (Throwable t) {
1394            if (transportDebug()) dprint(".CANCEL: id/"
1395                                         + header.getRequestId()
1396                                         + ": !!ERROR!!: "
1397                                         + header, t);
1398            // Mask the exception from thread.;
1399        } finally {
1400            if (transportDebug()) dprint(".CANCEL<-: id/"
1401                                         + header.getRequestId() + ": "
1402                                         + header.getGIOPVersion() + ": "
1403                                         + header);
1404        }
1405    }
1406
1407    private void throwNotImplemented()
1408    {
1409        isThreadDone = false;
1410        throwNotImplemented("");
1411    }
1412
1413    private void throwNotImplemented(String msg)
1414    {
1415        throw new RuntimeException("CorbaMessageMediatorImpl: not implemented " + msg);
1416    }
1417
1418    private void dprint(String msg, Throwable t)
1419    {
1420        dprint(msg);
1421        t.printStackTrace(System.out);
1422    }
1423
1424    private void dprint(String msg)
1425    {
1426        ORBUtility.dprint("CorbaMessageMediatorImpl", msg);
1427    }
1428
1429    protected String opAndId(CorbaMessageMediator mediator)
1430    {
1431        return ORBUtility.operationNameAndRequestId(mediator);
1432    }
1433
1434    private boolean transportDebug()
1435    {
1436        return orb.transportDebugFlag;
1437    }
1438
1439    // REVISIT: move this to subcontract (but both client and server need it).
1440    private final void processCancelRequest(int cancelReqId) {
1441
1442        // The GIOP version of CancelRequest does not matter, since
1443        // CancelRequest_1_0 could be sent to cancel a request which
1444        // has a different GIOP version.
1445
1446        /*
1447         * CancelRequest processing logic :
1448         *
1449         *  - find the request with matching requestId
1450         *
1451         *  - call cancelProcessing() in BufferManagerRead [BMR]
1452         *
1453         *  - the hope is that worker thread would call BMR.underflow()
1454         *    to wait for more fragments to come in. When BMR.underflow() is
1455         *    called, if a CancelRequest had already arrived,
1456         *    the worker thread would throw ThreadDeath,
1457         *    else the thread would wait to be notified of the
1458         *    arrival of a new fragment or CancelRequest. Upon notification,
1459         *    the woken up thread would check to see if a CancelRequest had
1460         *    arrived and if so throw a ThreadDeath or it will continue to
1461         *    process the received fragment.
1462         *
1463         *  - if all the fragments had been received prior to CancelRequest
1464         *    then the worker thread would never block in BMR.underflow().
1465         *    So, setting the abort flag in BMR has no effect. The request
1466         *    processing will complete normally.
1467         *
1468         *  - in the case where the server has received enough fragments to
1469         *    start processing the request and the server sends out
1470         *    an early reply. In such a case if the CancelRequest arrives
1471         *    after the reply has been sent, it has no effect.
1472         */
1473
1474        if (!connection.isServer()) {
1475            return; // we do not support bi-directional giop yet, ignore.
1476        }
1477
1478        // Try to get hold of the InputStream buffer.
1479        // In the case of 1.0 requests there is no way to get hold of
1480        // InputStream. Try out the 1.1 and 1.2 cases.
1481
1482        // was the request 1.2 ?
1483        MessageMediator mediator = connection.serverRequestMapGet(cancelReqId);
1484        int requestId ;
1485        if (mediator == null) {
1486            // was the request 1.1 ?
1487            mediator = connection.serverRequest_1_1_Get();
1488            if (mediator == null) {
1489                // XXX log this!
1490                // either the request was 1.0
1491                // or an early reply has already been sent
1492                // or request processing is over
1493                // or its a spurious CancelRequest
1494                return; // do nothing.
1495            }
1496
1497            requestId = ((CorbaMessageMediator) mediator).getRequestId();
1498
1499            if (requestId != cancelReqId) {
1500                // A spurious 1.1 CancelRequest has been received.
1501                // XXX log this!
1502                return; // do nothing
1503            }
1504
1505            if (requestId == 0) { // special case
1506                // XXX log this
1507                // this means that
1508                // 1. the 1.1 requests' requestId has not been received
1509                //    i.e., a CancelRequest was received even before the
1510                //    1.1 request was received. The spec disallows this.
1511                // 2. or the 1.1 request has a requestId 0.
1512                //
1513                // It is a little tricky to distinguish these two. So, be
1514                // conservative and do not cancel the request. Downside is that
1515                // 1.1 requests with requestId of 0 will never be cancelled.
1516                return; // do nothing
1517            }
1518        } else {
1519            requestId = ((CorbaMessageMediator) mediator).getRequestId();
1520        }
1521
1522        Message msg = ((CorbaMessageMediator)mediator).getRequestHeader();
1523        if (msg.getType() != Message.GIOPRequest) {
1524            // Any mediator obtained here should only ever be for a GIOP
1525            // request.
1526            wrapper.badMessageTypeForCancel() ;
1527        }
1528
1529        // At this point we have a valid message mediator that contains
1530        // a valid requestId.
1531
1532        // at this point we have chosen a request to be cancelled. But we
1533        // do not know if the target object's method has been invoked or not.
1534        // Request input stream being available simply means that the request
1535        // processing is not over yet. simply set the abort flag in the
1536        // BMRS and hope that the worker thread would notice it (this can
1537        // happen only if the request stream is being unmarshalled and the
1538        // target's method has not been invoked yet). This guarantees
1539        // that the requests which have been dispatched to the
1540        // target's method will never be cancelled.
1541
1542        BufferManagerReadStream bufferManager = (BufferManagerReadStream)
1543            ((CDRInputObject)mediator.getInputObject()).getBufferManager();
1544        bufferManager.cancelProcessing(cancelReqId);
1545    }
1546
1547    ////////////////////////////////////////////////////
1548    //
1549    // spi.protocol.CorbaProtocolHandler
1550    //
1551
1552    public void handleRequest(RequestMessage msg,
1553                              CorbaMessageMediator messageMediator)
1554    {
1555        try {
1556            beginRequest(messageMediator);
1557            try {
1558                handleRequestRequest(messageMediator);
1559                if (messageMediator.isOneWay()) {
1560                    return;
1561                }
1562            } catch (Throwable t) {
1563                if (messageMediator.isOneWay()) {
1564                    return;
1565                }
1566                handleThrowableDuringServerDispatch(
1567                    messageMediator, t, CompletionStatus.COMPLETED_MAYBE);
1568            }
1569            sendResponse(messageMediator);
1570        } catch (Throwable t) {
1571            dispatchError(messageMediator, "RequestMessage", t);
1572        } finally {
1573            endRequest(messageMediator);
1574        }
1575    }
1576
1577    public void handleRequest(LocateRequestMessage msg,
1578                              CorbaMessageMediator messageMediator)
1579    {
1580        try {
1581            beginRequest(messageMediator);
1582            try {
1583                handleLocateRequest(messageMediator);
1584            } catch (Throwable t) {
1585                handleThrowableDuringServerDispatch(
1586                    messageMediator, t, CompletionStatus.COMPLETED_MAYBE);
1587            }
1588            sendResponse(messageMediator);
1589        } catch (Throwable t) {
1590            dispatchError(messageMediator, "LocateRequestMessage", t);
1591        } finally {
1592            endRequest(messageMediator);
1593        }
1594    }
1595
1596    private void beginRequest(CorbaMessageMediator messageMediator)
1597    {
1598        ORB orb = (ORB) messageMediator.getBroker();
1599        if (orb.subcontractDebugFlag) {
1600            dprint(".handleRequest->:");
1601        }
1602        connection.serverRequestProcessingBegins();
1603    }
1604
1605    private void dispatchError(CorbaMessageMediator messageMediator,
1606                               String msg, Throwable t)
1607    {
1608        if (orb.subcontractDebugFlag) {
1609            dprint(".handleRequest: " + opAndId(messageMediator)
1610                   + ": !!ERROR!!: "
1611                   + msg,
1612                   t);
1613        }
1614        // REVISIT - this makes hcks sendTwoObjects fail
1615        // messageMediator.getConnection().close();
1616    }
1617
1618    private void sendResponse(CorbaMessageMediator messageMediator)
1619    {
1620        if (orb.subcontractDebugFlag) {
1621            dprint(".handleRequest: " + opAndId(messageMediator)
1622                   + ": sending response");
1623        }
1624        // REVISIT - type and location
1625        CDROutputObject outputObject = (CDROutputObject)
1626            messageMediator.getOutputObject();
1627        if (outputObject != null) {
1628            // REVISIT - can be null for TRANSIENT below.
1629            outputObject.finishSendingMessage();
1630        }
1631    }
1632
1633    private void endRequest(CorbaMessageMediator messageMediator)
1634    {
1635        ORB orb = (ORB) messageMediator.getBroker();
1636        if (orb.subcontractDebugFlag) {
1637            dprint(".handleRequest<-: " + opAndId(messageMediator));
1638        }
1639
1640        // release NIO ByteBuffers to ByteBufferPool
1641
1642        try {
1643            OutputObject outputObj = messageMediator.getOutputObject();
1644            if (outputObj != null) {
1645                outputObj.close();
1646            }
1647            InputObject inputObj = messageMediator.getInputObject();
1648            if (inputObj != null) {
1649                inputObj.close();
1650            }
1651        } catch (IOException ex) {
1652            // Given what close() does, this catch shouldn't ever happen.
1653            // See CDRInput/OutputObject.close() for more info.
1654            // It also won't result in a Corba error if an IOException happens.
1655            if (orb.subcontractDebugFlag) {
1656                dprint(".endRequest: IOException:" + ex.getMessage(), ex);
1657            }
1658        } finally {
1659            ((CorbaConnection)messageMediator.getConnection()).serverRequestProcessingEnds();
1660        }
1661    }
1662
1663    protected void handleRequestRequest(CorbaMessageMediator messageMediator)
1664    {
1665        // Does nothing if already unmarshaled.
1666        ((CDRInputObject)messageMediator.getInputObject()).unmarshalHeader();
1667
1668        ORB orb = (ORB)messageMediator.getBroker();
1669        synchronized (orb) {
1670            orb.checkShutdownState();
1671        }
1672
1673        ObjectKey okey = messageMediator.getObjectKey();
1674        if (orb.subcontractDebugFlag) {
1675            ObjectKeyTemplate oktemp = okey.getTemplate() ;
1676            dprint( ".handleRequest: " + opAndId(messageMediator)
1677                    + ": dispatching to scid: " + oktemp.getSubcontractId());
1678        }
1679
1680        CorbaServerRequestDispatcher sc = okey.getServerRequestDispatcher(orb);
1681
1682        if (orb.subcontractDebugFlag) {
1683            dprint(".handleRequest: " + opAndId(messageMediator)
1684                   + ": dispatching to sc: " + sc);
1685        }
1686
1687        if (sc == null) {
1688            throw wrapper.noServerScInDispatch() ;
1689        }
1690
1691        // NOTE:
1692        // This is necessary so mediator can act as ResponseHandler
1693        // and pass necessary info to response constructors located
1694        // in the subcontract.
1695        // REVISIT - same class right now.
1696        //messageMediator.setProtocolHandler(this);
1697
1698        try {
1699            orb.startingDispatch();
1700            sc.dispatch(messageMediator);
1701        } finally {
1702            orb.finishedDispatch();
1703        }
1704    }
1705
1706    protected void handleLocateRequest(CorbaMessageMediator messageMediator)
1707    {
1708        ORB orb = (ORB)messageMediator.getBroker();
1709        LocateRequestMessage msg = (LocateRequestMessage)
1710            messageMediator.getDispatchHeader();
1711        IOR ior = null;
1712        LocateReplyMessage reply = null;
1713        short addrDisp = -1;
1714
1715        try {
1716            ((CDRInputObject)messageMediator.getInputObject()).unmarshalHeader();
1717            CorbaServerRequestDispatcher sc =
1718                msg.getObjectKey().getServerRequestDispatcher( orb ) ;
1719            if (sc == null) {
1720                return;
1721            }
1722
1723            ior = sc.locate(msg.getObjectKey());
1724
1725            if ( ior == null ) {
1726                reply = MessageBase.createLocateReply(
1727                            orb, msg.getGIOPVersion(),
1728                            msg.getEncodingVersion(),
1729                            msg.getRequestId(),
1730                            LocateReplyMessage.OBJECT_HERE, null);
1731
1732            } else {
1733                reply = MessageBase.createLocateReply(
1734                            orb, msg.getGIOPVersion(),
1735                            msg.getEncodingVersion(),
1736                            msg.getRequestId(),
1737                            LocateReplyMessage.OBJECT_FORWARD, ior);
1738            }
1739            // REVISIT: Should we catch SystemExceptions?
1740
1741        } catch (AddressingDispositionException ex) {
1742
1743            // create a response containing the expected target
1744            // addressing disposition.
1745
1746            reply = MessageBase.createLocateReply(
1747                        orb, msg.getGIOPVersion(),
1748                        msg.getEncodingVersion(),
1749                        msg.getRequestId(),
1750                        LocateReplyMessage.LOC_NEEDS_ADDRESSING_MODE, null);
1751
1752            addrDisp = ex.expectedAddrDisp();
1753
1754        } catch (RequestCanceledException ex) {
1755
1756            return; // no need to send reply
1757
1758        } catch ( Exception ex ) {
1759
1760            // REVISIT If exception is not OBJECT_NOT_EXIST, it should
1761            // have a different reply
1762
1763            // This handles OBJECT_NOT_EXIST exceptions thrown in
1764            // the subcontract or obj manager. Send back UNKNOWN_OBJECT.
1765
1766            reply = MessageBase.createLocateReply(
1767                        orb, msg.getGIOPVersion(),
1768                        msg.getEncodingVersion(),
1769                        msg.getRequestId(),
1770                        LocateReplyMessage.UNKNOWN_OBJECT, null);
1771        }
1772
1773        CDROutputObject outputObject =
1774            createAppropriateOutputObject(messageMediator,
1775                                          msg, reply);
1776        messageMediator.setOutputObject(outputObject);
1777        outputObject.setMessageMediator(messageMediator);
1778
1779        reply.write(outputObject);
1780        // outputObject.setMessage(reply); // REVISIT - not necessary
1781        if (ior != null) {
1782            ior.write(outputObject);
1783        }
1784        if (addrDisp != -1) {
1785            AddressingDispositionHelper.write(outputObject, addrDisp);
1786        }
1787    }
1788
1789    private CDROutputObject createAppropriateOutputObject(
1790        CorbaMessageMediator messageMediator,
1791        Message msg, LocateReplyMessage reply)
1792    {
1793        CDROutputObject outputObject;
1794
1795        if (msg.getGIOPVersion().lessThan(GIOPVersion.V1_2)) {
1796            // locate msgs 1.0 & 1.1 :=> grow,
1797            outputObject = sun.corba.OutputStreamFactory.newCDROutputObject(
1798                             (ORB) messageMediator.getBroker(),
1799                             this,
1800                             GIOPVersion.V1_0,
1801                             (CorbaConnection) messageMediator.getConnection(),
1802                             reply,
1803                             ORBConstants.STREAM_FORMAT_VERSION_1);
1804        } else {
1805            // 1.2 :=> stream
1806            outputObject = sun.corba.OutputStreamFactory.newCDROutputObject(
1807                             (ORB) messageMediator.getBroker(),
1808                             messageMediator,
1809                             reply,
1810                             ORBConstants.STREAM_FORMAT_VERSION_1);
1811        }
1812        return outputObject;
1813    }
1814
1815    public void handleThrowableDuringServerDispatch(
1816        CorbaMessageMediator messageMediator,
1817        Throwable throwable,
1818        CompletionStatus completionStatus)
1819    {
1820        if (((ORB)messageMediator.getBroker()).subcontractDebugFlag) {
1821            dprint(".handleThrowableDuringServerDispatch: "
1822                   + opAndId(messageMediator) + ": "
1823                   + throwable);
1824        }
1825
1826        // If we haven't unmarshaled the header, we probably don't
1827        // have enough information to even send back a reply.
1828
1829        // REVISIT
1830        // Cannot do this check.  When target addressing disposition does
1831        // not match (during header unmarshaling) it throws an exception
1832        // to be handled here.
1833        /*
1834        if (! ((CDRInputObject)messageMediator.getInputObject())
1835            .unmarshaledHeader()) {
1836            return;
1837        }
1838        */
1839        handleThrowableDuringServerDispatch(messageMediator,
1840                                            throwable,
1841                                            completionStatus,
1842                                            1);
1843    }
1844
1845
1846    // REVISIT - catch and ignore RequestCanceledException.
1847
1848    protected void handleThrowableDuringServerDispatch(
1849        CorbaMessageMediator messageMediator,
1850        Throwable throwable,
1851        CompletionStatus completionStatus,
1852        int iteration)
1853    {
1854        if (iteration > 10) {
1855            if (((ORB)messageMediator.getBroker()).subcontractDebugFlag) {
1856                dprint(".handleThrowableDuringServerDispatch: "
1857                       + opAndId(messageMediator)
1858                       + ": cannot handle: "
1859                       + throwable);
1860            }
1861
1862            // REVISIT - should we close connection?
1863            RuntimeException rte =
1864                new RuntimeException("handleThrowableDuringServerDispatch: " +
1865                                     "cannot create response.");
1866            rte.initCause(throwable);
1867            throw rte;
1868        }
1869
1870        try {
1871            if (throwable instanceof ForwardException) {
1872                ForwardException fex = (ForwardException)throwable ;
1873                createLocationForward( messageMediator, fex.getIOR(), null ) ;
1874                return;
1875            }
1876
1877            if (throwable instanceof AddressingDispositionException) {
1878                handleAddressingDisposition(
1879                    messageMediator,
1880                    (AddressingDispositionException)throwable);
1881                return;
1882            }
1883
1884            // Else.
1885
1886            SystemException sex =
1887                convertThrowableToSystemException(throwable, completionStatus);
1888
1889            createSystemExceptionResponse(messageMediator, sex, null);
1890            return;
1891
1892        } catch (Throwable throwable2) {
1893
1894            // User code (e.g., postinvoke, interceptors) may change
1895            // the exception, so we end up back here.
1896            // Report the changed exception.
1897
1898            handleThrowableDuringServerDispatch(messageMediator,
1899                                                throwable2,
1900                                                completionStatus,
1901                                                iteration + 1);
1902            return;
1903        }
1904    }
1905
1906    protected SystemException convertThrowableToSystemException(
1907        Throwable throwable,
1908        CompletionStatus completionStatus)
1909    {
1910        if (throwable instanceof SystemException) {
1911            return (SystemException)throwable;
1912        }
1913
1914        if (throwable instanceof RequestCanceledException) {
1915            // Reporting an exception response causes the
1916            // poa current stack, the interceptor stacks, etc.
1917            // to be balanced.  It also notifies interceptors
1918            // that the request was cancelled.
1919
1920            return wrapper.requestCanceled( throwable ) ;
1921        }
1922
1923        // NOTE: We do not trap ThreadDeath above Throwable.
1924        // There is no reason to stop the thread.  It is
1925        // just a worker thread.  The ORB never throws
1926        // ThreadDeath.  Client code may (e.g., in ServantManagers,
1927        // interceptors, or servants) but that should not
1928        // effect the ORB threads.  So it is just handled
1929        // generically.
1930
1931        //
1932        // Last resort.
1933        // If user code throws a non-SystemException report it generically.
1934        //
1935
1936        return wrapper.runtimeexception( CompletionStatus.COMPLETED_MAYBE, throwable ) ;
1937    }
1938
1939    protected void handleAddressingDisposition(
1940        CorbaMessageMediator messageMediator,
1941        AddressingDispositionException ex)
1942    {
1943
1944        short addrDisp = -1;
1945
1946        // from iiop.RequestProcessor.
1947
1948        // Respond with expected target addressing disposition.
1949
1950        switch (messageMediator.getRequestHeader().getType()) {
1951        case Message.GIOPRequest :
1952            ReplyMessage replyHeader = MessageBase.createReply(
1953                          (ORB)messageMediator.getBroker(),
1954                          messageMediator.getGIOPVersion(),
1955                          messageMediator.getEncodingVersion(),
1956                          messageMediator.getRequestId(),
1957                          ReplyMessage.NEEDS_ADDRESSING_MODE,
1958                          null, null);
1959            // REVISIT: via acceptor factory.
1960            CDROutputObject outputObject =
1961                sun.corba.OutputStreamFactory.newCDROutputObject(
1962                (ORB)messageMediator.getBroker(),
1963                this,
1964                messageMediator.getGIOPVersion(),
1965                (CorbaConnection)messageMediator.getConnection(),
1966                replyHeader,
1967                ORBConstants.STREAM_FORMAT_VERSION_1);
1968            messageMediator.setOutputObject(outputObject);
1969            outputObject.setMessageMediator(messageMediator);
1970            replyHeader.write(outputObject);
1971            AddressingDispositionHelper.write(outputObject,
1972                                              ex.expectedAddrDisp());
1973            return;
1974
1975        case Message.GIOPLocateRequest :
1976            LocateReplyMessage locateReplyHeader = MessageBase.createLocateReply(
1977                (ORB)messageMediator.getBroker(),
1978                messageMediator.getGIOPVersion(),
1979                messageMediator.getEncodingVersion(),
1980                messageMediator.getRequestId(),
1981                LocateReplyMessage.LOC_NEEDS_ADDRESSING_MODE,
1982                null);
1983
1984            addrDisp = ex.expectedAddrDisp();
1985
1986            // REVISIT: via acceptor factory.
1987            outputObject =
1988                createAppropriateOutputObject(messageMediator,
1989                                              messageMediator.getRequestHeader(),
1990                                              locateReplyHeader);
1991            messageMediator.setOutputObject(outputObject);
1992            outputObject.setMessageMediator(messageMediator);
1993            locateReplyHeader.write(outputObject);
1994            IOR ior = null;
1995            if (ior != null) {
1996                ior.write(outputObject);
1997            }
1998            if (addrDisp != -1) {
1999                AddressingDispositionHelper.write(outputObject, addrDisp);
2000            }
2001            return;
2002        }
2003    }
2004
2005    public CorbaMessageMediator createResponse(
2006        CorbaMessageMediator messageMediator,
2007        ServiceContexts svc)
2008    {
2009        // REVISIT: ignore service contexts during framework transition.
2010        // They are set in SubcontractResponseHandler to the wrong connection.
2011        // Then they would be set again here and a duplicate contexts
2012        // exception occurs.
2013        return createResponseHelper(
2014            messageMediator,
2015            getServiceContextsForReply(messageMediator, null));
2016    }
2017
2018    public CorbaMessageMediator createUserExceptionResponse(
2019        CorbaMessageMediator messageMediator, ServiceContexts svc)
2020    {
2021        // REVISIT - same as above
2022        return createResponseHelper(
2023            messageMediator,
2024            getServiceContextsForReply(messageMediator, null),
2025            true);
2026    }
2027
2028    public CorbaMessageMediator createUnknownExceptionResponse(
2029        CorbaMessageMediator messageMediator, UnknownException ex)
2030    {
2031        // NOTE: This service context container gets augmented in
2032        // tail call.
2033        ServiceContexts contexts = null;
2034        SystemException sys = new UNKNOWN( 0,
2035            CompletionStatus.COMPLETED_MAYBE);
2036        contexts = new ServiceContexts( (ORB)messageMediator.getBroker() );
2037        UEInfoServiceContext uei = new UEInfoServiceContext(sys);
2038        contexts.put( uei ) ;
2039        return createSystemExceptionResponse(messageMediator, sys, contexts);
2040    }
2041
2042    public CorbaMessageMediator createSystemExceptionResponse(
2043        CorbaMessageMediator messageMediator,
2044        SystemException ex,
2045        ServiceContexts svc)
2046    {
2047        if (messageMediator.getConnection() != null) {
2048            // It is possible that fragments of response have already been
2049            // sent.  Then an error may occur (e.g. marshaling error like
2050            // non serializable object).  In that case it is too late
2051            // to send the exception.  We just return the existing fragmented
2052            // stream here.  This will cause an incomplete last fragment
2053            // to be sent.  Then the other side will get a marshaling error
2054            // when attempting to unmarshal.
2055
2056            // REVISIT: Impl - make interface method to do the following.
2057            CorbaMessageMediatorImpl mediator = (CorbaMessageMediatorImpl)
2058                ((CorbaConnection)messageMediator.getConnection())
2059                .serverRequestMapGet(messageMediator.getRequestId());
2060
2061            OutputObject existingOutputObject = null;
2062            if (mediator != null) {
2063                existingOutputObject = mediator.getOutputObject();
2064            }
2065
2066            // REVISIT: need to think about messageMediator containing correct
2067            // pointer to output object.
2068            if (existingOutputObject != null &&
2069                mediator.sentFragment() &&
2070                ! mediator.sentFullMessage())
2071            {
2072                return mediator;
2073            }
2074        }
2075
2076        // Only do this if interceptors have been initialized on this request
2077        // and have not completed their lifecycle (otherwise the info stack
2078        // may be empty or have a different request's entry on top).
2079        if (messageMediator.executePIInResponseConstructor()) {
2080            // REVISIT: not necessary in framework now?
2081            // Inform Portable Interceptors of the SystemException.  This is
2082            // required to be done here because the ending interception point
2083            // is called in the when creating the response below
2084            // but we do not currently write the SystemException into the
2085            // response until after the ending point is called.
2086            ((ORB)messageMediator.getBroker()).getPIHandler().setServerPIInfo( ex );
2087        }
2088
2089        if (((ORB)messageMediator.getBroker()).subcontractDebugFlag &&
2090            ex != null)
2091        {
2092            dprint(".createSystemExceptionResponse: "
2093                   + opAndId(messageMediator),
2094                   ex);
2095        }
2096
2097        ServiceContexts serviceContexts =
2098            getServiceContextsForReply(messageMediator, svc);
2099
2100        // NOTE: We MUST add the service context before creating
2101        // the response since service contexts are written to the
2102        // stream when the response object is created.
2103
2104        addExceptionDetailMessage(messageMediator, ex, serviceContexts);
2105
2106        CorbaMessageMediator response =
2107            createResponseHelper(messageMediator, serviceContexts, false);
2108
2109        // NOTE: From here on, it is too late to add more service contexts.
2110        // They have already been serialized to the stream (and maybe fragments
2111        // sent).
2112
2113        ORBUtility.writeSystemException(
2114            ex, (OutputStream)response.getOutputObject());
2115
2116        return response;
2117    }
2118
2119    private void addExceptionDetailMessage(CorbaMessageMediator mediator,
2120                                           SystemException ex,
2121                                           ServiceContexts serviceContexts)
2122    {
2123        ByteArrayOutputStream baos = new ByteArrayOutputStream();
2124        PrintWriter pw = new PrintWriter(baos);
2125        ex.printStackTrace(pw);
2126        pw.flush(); // NOTE: you must flush or baos will be empty.
2127        EncapsOutputStream encapsOutputStream =
2128            sun.corba.OutputStreamFactory.newEncapsOutputStream((ORB)mediator.getBroker());
2129        encapsOutputStream.putEndian();
2130        encapsOutputStream.write_wstring(baos.toString());
2131        UnknownServiceContext serviceContext =
2132            new UnknownServiceContext(ExceptionDetailMessage.value,
2133                                      encapsOutputStream.toByteArray());
2134        serviceContexts.put(serviceContext);
2135    }
2136
2137    public CorbaMessageMediator createLocationForward(
2138        CorbaMessageMediator messageMediator, IOR ior, ServiceContexts svc)
2139    {
2140        ReplyMessage reply
2141            = MessageBase.createReply(
2142                  (ORB)messageMediator.getBroker(),
2143                  messageMediator.getGIOPVersion(),
2144                  messageMediator.getEncodingVersion(),
2145                  messageMediator.getRequestId(),
2146                  ReplyMessage.LOCATION_FORWARD,
2147                  getServiceContextsForReply(messageMediator, svc),
2148                  ior);
2149
2150        return createResponseHelper(messageMediator, reply, ior);
2151    }
2152
2153    protected CorbaMessageMediator createResponseHelper(
2154        CorbaMessageMediator messageMediator, ServiceContexts svc)
2155    {
2156        ReplyMessage message =
2157            MessageBase.createReply(
2158                (ORB)messageMediator.getBroker(),
2159                messageMediator.getGIOPVersion(),
2160                messageMediator.getEncodingVersion(),
2161                messageMediator.getRequestId(),
2162                ReplyMessage.NO_EXCEPTION,
2163                svc,
2164                null);
2165        return createResponseHelper(messageMediator, message, null);
2166    }
2167
2168    protected CorbaMessageMediator createResponseHelper(
2169        CorbaMessageMediator messageMediator, ServiceContexts svc,boolean user)
2170    {
2171        ReplyMessage message =
2172            MessageBase.createReply(
2173                (ORB)messageMediator.getBroker(),
2174                messageMediator.getGIOPVersion(),
2175                messageMediator.getEncodingVersion(),
2176                messageMediator.getRequestId(),
2177                user ? ReplyMessage.USER_EXCEPTION :
2178                       ReplyMessage.SYSTEM_EXCEPTION,
2179                svc,
2180                null);
2181        return createResponseHelper(messageMediator, message, null);
2182    }
2183
2184    // REVISIT - IOR arg is ignored.
2185    protected CorbaMessageMediator createResponseHelper(
2186        CorbaMessageMediator messageMediator, ReplyMessage reply, IOR ior)
2187    {
2188        // REVISIT - these should be invoked from subcontract.
2189        runServantPostInvoke(messageMediator);
2190        runInterceptors(messageMediator, reply);
2191        runRemoveThreadInfo(messageMediator);
2192
2193        if (((ORB)messageMediator.getBroker()).subcontractDebugFlag) {
2194            dprint(".createResponseHelper: "
2195                   + opAndId(messageMediator) + ": "
2196                   + reply);
2197        }
2198
2199        messageMediator.setReplyHeader(reply);
2200
2201        OutputObject replyOutputObject;
2202        // REVISIT = do not use null.
2203        //
2204        if (messageMediator.getConnection() == null) {
2205            replyOutputObject =
2206                sun.corba.OutputStreamFactory.newCDROutputObject(orb,
2207                            messageMediator, messageMediator.getReplyHeader(),
2208                            messageMediator.getStreamFormatVersion(),
2209                            BufferManagerFactory.GROW);
2210        } else {
2211            replyOutputObject = messageMediator.getConnection().getAcceptor()
2212             .createOutputObject(messageMediator.getBroker(), messageMediator);
2213        }
2214        messageMediator.setOutputObject(replyOutputObject);
2215        messageMediator.getOutputObject().setMessageMediator(messageMediator);
2216
2217        reply.write((OutputStream) messageMediator.getOutputObject());
2218        if (reply.getIOR() != null) {
2219            reply.getIOR().write((OutputStream) messageMediator.getOutputObject());
2220        }
2221        // REVISIT - not necessary?
2222        //messageMediator.this.replyIOR = reply.getIOR();
2223
2224        // NOTE: The mediator holds onto output object so return value
2225        // not really necessary.
2226        return messageMediator;
2227    }
2228
2229    protected void runServantPostInvoke(CorbaMessageMediator messageMediator)
2230    {
2231        // Run ServantLocator::postinvoke.  This may cause a SystemException
2232        // which will throw out of the constructor and return later
2233        // to construct a reply for that exception.  The internal logic
2234        // of returnServant makes sure that postinvoke is only called once.
2235        // REVISIT: instead of instanceof, put method on all orbs.
2236        ORB orb = null;
2237        // This flag is to deal with BootstrapServer use of reply streams,
2238        // with ServerRequestDispatcher's use of reply streams, etc.
2239        if (messageMediator.executeReturnServantInResponseConstructor()) {
2240            // It is possible to get marshaling errors in the skeleton after
2241            // postinvoke has completed.  We must set this to false so that
2242            // when the error exception reply is constructed we don't try
2243            // to incorrectly access poa current (which will be the wrong
2244            // one or an empty stack.
2245            messageMediator.setExecuteReturnServantInResponseConstructor(false);
2246            messageMediator.setExecuteRemoveThreadInfoInResponseConstructor(true);
2247
2248            try {
2249                orb = (ORB)messageMediator.getBroker();
2250                OAInvocationInfo info = orb.peekInvocationInfo() ;
2251                ObjectAdapter oa = info.oa();
2252                try {
2253                    oa.returnServant() ;
2254                } catch (Throwable thr) {
2255                    wrapper.unexpectedException( thr ) ;
2256
2257                    if (thr instanceof Error)
2258                        throw (Error)thr ;
2259                    else if (thr instanceof RuntimeException)
2260                        throw (RuntimeException)thr ;
2261                } finally {
2262                    oa.exit();
2263                }
2264            } catch (EmptyStackException ese) {
2265                throw wrapper.emptyStackRunServantPostInvoke( ese ) ;
2266            }
2267        }
2268    }
2269
2270    protected void runInterceptors(CorbaMessageMediator messageMediator,
2271                                   ReplyMessage reply)
2272    {
2273        if( messageMediator.executePIInResponseConstructor() ) {
2274            // Invoke server request ending interception points (send_*):
2275            // Note: this may end up with a SystemException or an internal
2276            // Runtime ForwardRequest
2277            ((ORB)messageMediator.getBroker()).getPIHandler().
2278                invokeServerPIEndingPoint( reply );
2279
2280            // Note this will be executed even if a ForwardRequest or
2281            // SystemException is thrown by a Portable Interceptors ending
2282            // point since we end up in this constructor again anyway.
2283            ((ORB)messageMediator.getBroker()).getPIHandler().
2284                cleanupServerPIRequest();
2285
2286            // See createSystemExceptionResponse for why this is necesary.
2287            messageMediator.setExecutePIInResponseConstructor(false);
2288        }
2289    }
2290
2291    protected void runRemoveThreadInfo(CorbaMessageMediator messageMediator)
2292    {
2293        // Once you get here then the final reply is available (i.e.,
2294        // postinvoke and interceptors have completed.
2295        if (messageMediator.executeRemoveThreadInfoInResponseConstructor()) {
2296            messageMediator.setExecuteRemoveThreadInfoInResponseConstructor(false);
2297            ((ORB)messageMediator.getBroker()).popInvocationInfo() ;
2298        }
2299    }
2300
2301    protected ServiceContexts getServiceContextsForReply(
2302        CorbaMessageMediator messageMediator, ServiceContexts contexts)
2303    {
2304        CorbaConnection c = (CorbaConnection) messageMediator.getConnection();
2305
2306        if (((ORB)messageMediator.getBroker()).subcontractDebugFlag) {
2307            dprint(".getServiceContextsForReply: "
2308                   + opAndId(messageMediator)
2309                   + ": " + c);
2310        }
2311
2312        if (contexts == null) {
2313            contexts = new ServiceContexts(((ORB)messageMediator.getBroker()));
2314        }
2315
2316        // NOTE : We only want to send the runtime context the first time
2317
2318        if (c != null && !c.isPostInitialContexts()) {
2319            c.setPostInitialContexts();
2320            SendingContextServiceContext scsc =
2321                new SendingContextServiceContext(
2322                    ((ORB)messageMediator.getBroker()).getFVDCodeBaseIOR()) ;
2323
2324            if (contexts.get( scsc.getId() ) != null)
2325                throw wrapper.duplicateSendingContextServiceContext() ;
2326
2327            contexts.put( scsc ) ;
2328
2329            if ( ((ORB)messageMediator.getBroker()).subcontractDebugFlag)
2330                dprint(".getServiceContextsForReply: "
2331                       + opAndId(messageMediator)
2332                       + ": added SendingContextServiceContext" ) ;
2333        }
2334
2335        // send ORBVersion servicecontext as part of the Reply
2336
2337        ORBVersionServiceContext ovsc
2338            = new ORBVersionServiceContext(ORBVersionFactory.getORBVersion());
2339
2340        if (contexts.get( ovsc.getId() ) != null)
2341            throw wrapper.duplicateOrbVersionServiceContext() ;
2342
2343        contexts.put( ovsc ) ;
2344
2345        if ( ((ORB)messageMediator.getBroker()).subcontractDebugFlag)
2346            dprint(".getServiceContextsForReply: "
2347                   + opAndId(messageMediator)
2348                   + ": added ORB version service context");
2349
2350        return contexts;
2351    }
2352
2353    // REVISIT - this method should be migrated to orbutil.ORBUtility
2354    //           since all locations that release ByteBuffers use
2355    //           very similar logic and debug information.
2356    private void releaseByteBufferToPool() {
2357        if (dispatchByteBuffer != null) {
2358            orb.getByteBufferPool().releaseByteBuffer(dispatchByteBuffer);
2359            if (transportDebug()) {
2360                int bbId = System.identityHashCode(dispatchByteBuffer);
2361                StringBuffer sb = new StringBuffer();
2362                sb.append(".handleInput: releasing ByteBuffer (" + bbId +
2363                          ") to ByteBufferPool");
2364                dprint(sb.toString());
2365             }
2366        }
2367    }
2368}
2369
2370// End of file.
2371