CorbaResponseWaitingRoomImpl.java revision 608:7e06bf1dcb09
1/*
2 * Copyright (c) 2001, 2012, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package com.sun.corba.se.impl.transport;
27
28import java.util.Collections;
29import java.util.HashMap;
30import java.util.Iterator;
31import java.util.Map;
32
33import org.omg.CORBA.CompletionStatus;
34import org.omg.CORBA.SystemException;
35
36import com.sun.corba.se.pept.encoding.InputObject;
37import com.sun.corba.se.pept.encoding.OutputObject;
38import com.sun.corba.se.pept.protocol.MessageMediator;
39
40import com.sun.corba.se.spi.logging.CORBALogDomains;
41import com.sun.corba.se.spi.orb.ORB;
42import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
43import com.sun.corba.se.spi.transport.CorbaConnection;
44import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;
45
46import com.sun.corba.se.impl.encoding.BufferManagerReadStream;
47import com.sun.corba.se.impl.encoding.CDRInputObject;
48import com.sun.corba.se.impl.logging.ORBUtilSystemException;
49import com.sun.corba.se.impl.orbutil.ORBUtility;
50import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyOrReplyMessage;
51import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage;
52
53/**
54 * @author Harold Carr
55 */
56public class CorbaResponseWaitingRoomImpl
57    implements
58        CorbaResponseWaitingRoom
59{
60    final static class OutCallDesc
61    {
62        java.lang.Object done = new java.lang.Object();
63        Thread thread;
64        MessageMediator messageMediator;
65        SystemException exception;
66        InputObject inputObject;
67    }
68
69    private ORB orb;
70    private ORBUtilSystemException wrapper ;
71
72    private CorbaConnection connection;
73    // Maps requestId to an OutCallDesc.
74    final private Map<Integer, OutCallDesc> out_calls;
75
76    public CorbaResponseWaitingRoomImpl(ORB orb, CorbaConnection connection)
77    {
78        this.orb = orb;
79        wrapper = ORBUtilSystemException.get( orb,
80            CORBALogDomains.RPC_TRANSPORT ) ;
81        this.connection = connection;
82        out_calls =
83            Collections.synchronizedMap(new HashMap<Integer, OutCallDesc>());
84    }
85
86    ////////////////////////////////////////////////////
87    //
88    // pept.transport.ResponseWaitingRoom
89    //
90
91    public void registerWaiter(MessageMediator mediator)
92    {
93        CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;
94
95        if (orb.transportDebugFlag) {
96            dprint(".registerWaiter: " + opAndId(messageMediator));
97        }
98
99        Integer requestId = messageMediator.getRequestIdInteger();
100
101        OutCallDesc call = new OutCallDesc();
102        call.thread = Thread.currentThread();
103        call.messageMediator = messageMediator;
104        out_calls.put(requestId, call);
105    }
106
107    public void unregisterWaiter(MessageMediator mediator)
108    {
109        CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;
110
111        if (orb.transportDebugFlag) {
112            dprint(".unregisterWaiter: " + opAndId(messageMediator));
113        }
114
115        Integer requestId = messageMediator.getRequestIdInteger();
116
117        out_calls.remove(requestId);
118    }
119
120    public InputObject waitForResponse(MessageMediator mediator)
121    {
122      CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;
123
124      try {
125
126        InputObject returnStream = null;
127
128        if (orb.transportDebugFlag) {
129            dprint(".waitForResponse->: " + opAndId(messageMediator));
130        }
131
132        Integer requestId = messageMediator.getRequestIdInteger();
133
134        if (messageMediator.isOneWay()) {
135            // The waiter is removed in releaseReply in the same
136            // way as a normal request.
137
138            if (orb.transportDebugFlag) {
139                dprint(".waitForResponse: one way - not waiting: "
140                       + opAndId(messageMediator));
141            }
142
143            return null;
144        }
145
146        OutCallDesc call = out_calls.get(requestId);
147        if (call == null) {
148            throw wrapper.nullOutCall(CompletionStatus.COMPLETED_MAYBE);
149        }
150
151        synchronized(call.done) {
152
153            while (call.inputObject == null && call.exception == null) {
154                // Wait for the reply from the server.
155                // The ReaderThread reads in the reply IIOP message
156                // and signals us.
157                try {
158                    if (orb.transportDebugFlag) {
159                        dprint(".waitForResponse: waiting: "
160                               + opAndId(messageMediator));
161                    }
162                    call.done.wait();
163                } catch (InterruptedException ie) {};
164            }
165
166            if (call.exception != null) {
167                if (orb.transportDebugFlag) {
168                    dprint(".waitForResponse: exception: "
169                           + opAndId(messageMediator));
170                }
171                throw call.exception;
172            }
173
174            returnStream = call.inputObject;
175        }
176
177        // REVISIT -- exceptions from unmarshaling code will
178        // go up through this client thread!
179
180        if (returnStream != null) {
181            // On fragmented streams the header MUST be unmarshaled here
182            // (in the client thread) in case it blocks.
183            // If the header was already unmarshaled, this won't
184            // do anything
185            // REVISIT: cast - need interface method.
186            ((CDRInputObject)returnStream).unmarshalHeader();
187        }
188
189        return returnStream;
190
191      } finally {
192        if (orb.transportDebugFlag) {
193            dprint(".waitForResponse<-: " + opAndId(messageMediator));
194        }
195      }
196    }
197
198    public void responseReceived(InputObject is)
199    {
200        CDRInputObject inputObject = (CDRInputObject) is;
201        LocateReplyOrReplyMessage header = (LocateReplyOrReplyMessage)
202            inputObject.getMessageHeader();
203        Integer requestId = new Integer(header.getRequestId());
204        OutCallDesc call = out_calls.get(requestId);
205
206        if (orb.transportDebugFlag) {
207            dprint(".responseReceived: id/"
208                   + requestId  + ": "
209                   + header);
210        }
211
212        // This is an interesting case.  It could mean that someone sent us a
213        // reply message, but we don't know what request it was for.  That
214        // would probably call for an error.  However, there's another case
215        // that's normal and we should think about --
216        //
217        // If the unmarshaling thread does all of its work inbetween the time
218        // the ReaderThread gives it the last fragment and gets to the
219        // out_calls.get line, then it will also be null, so just return;
220        if (call == null) {
221            if (orb.transportDebugFlag) {
222                dprint(".responseReceived: id/"
223                       + requestId
224                       + ": no waiter: "
225                       + header);
226            }
227            return;
228        }
229
230        // Set the reply InputObject and signal the client thread
231        // that the reply has been received.
232        // The thread signalled will remove outcall descriptor if appropriate.
233        // Otherwise, it'll be removed when last fragment for it has been put on
234        // BufferManagerRead's queue.
235        synchronized (call.done) {
236            CorbaMessageMediator messageMediator = (CorbaMessageMediator)
237                call.messageMediator;
238
239            if (orb.transportDebugFlag) {
240                dprint(".responseReceived: "
241                       + opAndId(messageMediator)
242                       + ": notifying waiters");
243            }
244
245            messageMediator.setReplyHeader(header);
246            messageMediator.setInputObject(is);
247            inputObject.setMessageMediator(messageMediator);
248            call.inputObject = is;
249            call.done.notify();
250        }
251    }
252
253    public int numberRegistered()
254    {
255        return out_calls.size();
256    }
257
258    //////////////////////////////////////////////////
259    //
260    // CorbaResponseWaitingRoom
261    //
262
263    public void signalExceptionToAllWaiters(SystemException systemException)
264    {
265
266        if (orb.transportDebugFlag) {
267            dprint(".signalExceptionToAllWaiters: " + systemException);
268        }
269
270        synchronized (out_calls) {
271            if (orb.transportDebugFlag) {
272                dprint(".signalExceptionToAllWaiters: out_calls size :" +
273                       out_calls.size());
274            }
275
276            for (OutCallDesc call : out_calls.values()) {
277                if (orb.transportDebugFlag) {
278                    dprint(".signalExceptionToAllWaiters: signaling " +
279                            call);
280                }
281                synchronized(call.done) {
282                    try {
283                        // anything waiting for BufferManagerRead's fragment queue
284                        // needs to be cancelled
285                        CorbaMessageMediator corbaMsgMediator =
286                                     (CorbaMessageMediator)call.messageMediator;
287                        CDRInputObject inputObject =
288                                   (CDRInputObject)corbaMsgMediator.getInputObject();
289                        // IMPORTANT: If inputObject is null, then no need to tell
290                        //            BufferManagerRead to cancel request processing.
291                        if (inputObject != null) {
292                            BufferManagerReadStream bufferManager =
293                                (BufferManagerReadStream)inputObject.getBufferManager();
294                            int requestId = corbaMsgMediator.getRequestId();
295                            bufferManager.cancelProcessing(requestId);
296                        }
297                    } catch (Exception e) {
298                    } finally {
299                        // attempt to wake up waiting threads in all cases
300                        call.inputObject = null;
301                        call.exception = systemException;
302                        call.done.notifyAll();
303                    }
304                }
305            }
306        }
307    }
308
309    public MessageMediator getMessageMediator(int requestId)
310    {
311        Integer id = new Integer(requestId);
312        OutCallDesc call = out_calls.get(id);
313        if (call == null) {
314            // This can happen when getting early reply fragments for a
315            // request which has completed (e.g., client marshaling error).
316            return null;
317        }
318        return call.messageMediator;
319    }
320
321    ////////////////////////////////////////////////////
322    //
323    // Implementation.
324    //
325
326    protected void dprint(String msg)
327    {
328        ORBUtility.dprint("CorbaResponseWaitingRoomImpl", msg);
329    }
330
331    protected String opAndId(CorbaMessageMediator mediator)
332    {
333        return ORBUtility.operationNameAndRequestId(mediator);
334    }
335}
336
337// End of file.
338