BufferManagerReadStream.java revision 608:7e06bf1dcb09
1/*
2 * Copyright (c) 2000, 2009, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package com.sun.corba.se.impl.encoding;
26
27import java.nio.ByteBuffer;
28import com.sun.corba.se.pept.transport.ByteBufferPool;
29import com.sun.corba.se.spi.logging.CORBALogDomains;
30import com.sun.corba.se.spi.orb.ORB;
31import com.sun.corba.se.impl.logging.ORBUtilSystemException;
32import com.sun.corba.se.impl.orbutil.ORBUtility;
33import com.sun.corba.se.impl.protocol.RequestCanceledException;
34import com.sun.corba.se.impl.protocol.giopmsgheaders.FragmentMessage;
35import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
36import java.util.*;
37
38public class BufferManagerReadStream
39    implements BufferManagerRead, MarkAndResetHandler
40{
41    private boolean receivedCancel = false;
42    private int cancelReqId = 0;
43
44    // We should convert endOfStream to a final static dummy end node
45    private boolean endOfStream = true;
46    private BufferQueue fragmentQueue = new BufferQueue();
47    private long FRAGMENT_TIMEOUT = 60000;
48
49    // REVISIT - This should go in BufferManagerRead. But, since
50    //           BufferManagerRead is an interface. BufferManagerRead
51    //           might ought to be an abstract class instead of an
52    //           interface.
53    private ORB orb ;
54    private ORBUtilSystemException wrapper ;
55    private boolean debug = false;
56
57    BufferManagerReadStream( ORB orb )
58    {
59        this.orb = orb ;
60        this.wrapper = ORBUtilSystemException.get( orb,
61            CORBALogDomains.RPC_ENCODING ) ;
62        debug = orb.transportDebugFlag;
63    }
64
65    public void cancelProcessing(int requestId) {
66        synchronized(fragmentQueue) {
67            receivedCancel = true;
68            cancelReqId = requestId;
69            fragmentQueue.notify();
70        }
71    }
72
73    public void processFragment(ByteBuffer byteBuffer, FragmentMessage msg)
74    {
75        ByteBufferWithInfo bbwi =
76            new ByteBufferWithInfo(orb, byteBuffer, msg.getHeaderLength());
77
78        synchronized (fragmentQueue) {
79            if (debug)
80            {
81                // print address of ByteBuffer being queued
82                int bbAddress = System.identityHashCode(byteBuffer);
83                StringBuffer sb = new StringBuffer(80);
84                sb.append("processFragment() - queueing ByteBuffer id (");
85                sb.append(bbAddress).append(") to fragment queue.");
86                String strMsg = sb.toString();
87                dprint(strMsg);
88            }
89            fragmentQueue.enqueue(bbwi);
90            endOfStream = !msg.moreFragmentsToFollow();
91            fragmentQueue.notify();
92        }
93    }
94
95    public ByteBufferWithInfo underflow (ByteBufferWithInfo bbwi)
96    {
97
98      ByteBufferWithInfo result = null;
99
100      try {
101          //System.out.println("ENTER underflow");
102
103        synchronized (fragmentQueue) {
104
105            if (receivedCancel) {
106                throw new RequestCanceledException(cancelReqId);
107            }
108
109            while (fragmentQueue.size() == 0) {
110
111                if (endOfStream) {
112                    throw wrapper.endOfStream() ;
113                }
114
115                boolean interrupted = false;
116                try {
117                    fragmentQueue.wait(FRAGMENT_TIMEOUT);
118                } catch (InterruptedException e) {
119                    interrupted = true;
120                }
121
122                if (!interrupted && fragmentQueue.size() == 0) {
123                    throw wrapper.bufferReadManagerTimeout();
124                }
125
126                if (receivedCancel) {
127                    throw new RequestCanceledException(cancelReqId);
128                }
129            }
130
131            result = fragmentQueue.dequeue();
132            result.fragmented = true;
133
134            if (debug)
135            {
136                // print address of ByteBuffer being dequeued
137                int bbAddr = System.identityHashCode(result.byteBuffer);
138                StringBuffer sb1 = new StringBuffer(80);
139                sb1.append("underflow() - dequeued ByteBuffer id (");
140                sb1.append(bbAddr).append(") from fragment queue.");
141                String msg1 = sb1.toString();
142                dprint(msg1);
143            }
144
145            // VERY IMPORTANT
146            // Release bbwi.byteBuffer to the ByteBufferPool only if
147            // this BufferManagerStream is not marked for potential restore.
148            if (markEngaged == false && bbwi != null && bbwi.byteBuffer != null)
149            {
150                ByteBufferPool byteBufferPool = getByteBufferPool();
151
152                if (debug)
153                {
154                    // print address of ByteBuffer being released
155                    int bbAddress = System.identityHashCode(bbwi.byteBuffer);
156                    StringBuffer sb = new StringBuffer(80);
157                    sb.append("underflow() - releasing ByteBuffer id (");
158                    sb.append(bbAddress).append(") to ByteBufferPool.");
159                    String msg = sb.toString();
160                    dprint(msg);
161                }
162
163                byteBufferPool.releaseByteBuffer(bbwi.byteBuffer);
164                bbwi.byteBuffer = null;
165                bbwi = null;
166            }
167        }
168        return result;
169      } finally {
170          //System.out.println("EXIT underflow");
171      }
172    }
173
174    public void init(Message msg) {
175        if (msg != null)
176            endOfStream = !msg.moreFragmentsToFollow();
177    }
178
179    // Release any queued ByteBufferWithInfo's byteBuffers to the
180    // ByteBufferPoool
181    public void close(ByteBufferWithInfo bbwi)
182    {
183        int inputBbAddress = 0;
184
185        // release ByteBuffers on fragmentQueue
186        if (fragmentQueue != null)
187        {
188            synchronized (fragmentQueue)
189            {
190                // IMPORTANT: The fragment queue may have one ByteBuffer
191                //            on it that's also on the CDRInputStream if
192                //            this method is called when the stream is 'marked'.
193                //            Thus, we'll compare the ByteBuffer passed
194                //            in (from a CDRInputStream) with all ByteBuffers
195                //            on the stack. If one is found to equal, it will
196                //            not be released to the ByteBufferPool.
197                if (bbwi != null)
198                {
199                    inputBbAddress = System.identityHashCode(bbwi.byteBuffer);
200                }
201
202                ByteBufferWithInfo abbwi = null;
203                ByteBufferPool byteBufferPool = getByteBufferPool();
204                while (fragmentQueue.size() != 0)
205                {
206                    abbwi = fragmentQueue.dequeue();
207                    if (abbwi != null && abbwi.byteBuffer != null)
208                    {
209                        int bbAddress = System.identityHashCode(abbwi.byteBuffer);
210                        if (inputBbAddress != bbAddress)
211                        {
212                            if (debug)
213                            {
214                                 // print address of ByteBuffer released
215                                 StringBuffer sb = new StringBuffer(80);
216                                 sb.append("close() - fragmentQueue is ")
217                                   .append("releasing ByteBuffer id (")
218                                   .append(bbAddress).append(") to ")
219                                   .append("ByteBufferPool.");
220                                 String msg = sb.toString();
221                                 dprint(msg);
222                            }
223                        }
224                        byteBufferPool.releaseByteBuffer(abbwi.byteBuffer);
225                    }
226                }
227            }
228            fragmentQueue = null;
229        }
230
231        // release ByteBuffers on fragmentStack
232        if (fragmentStack != null && fragmentStack.size() != 0)
233        {
234            // IMPORTANT: The fragment stack may have one ByteBuffer
235            //            on it that's also on the CDRInputStream if
236            //            this method is called when the stream is 'marked'.
237            //            Thus, we'll compare the ByteBuffer passed
238            //            in (from a CDRInputStream) with all ByteBuffers
239            //            on the stack. If one is found to equal, it will
240            //            not be released to the ByteBufferPool.
241            if (bbwi != null)
242            {
243                inputBbAddress = System.identityHashCode(bbwi.byteBuffer);
244            }
245
246            ByteBufferWithInfo abbwi = null;
247            ByteBufferPool byteBufferPool = getByteBufferPool();
248            ListIterator itr = fragmentStack.listIterator();
249            while (itr.hasNext())
250            {
251                abbwi = (ByteBufferWithInfo)itr.next();
252
253                if (abbwi != null && abbwi.byteBuffer != null)
254                {
255                   int bbAddress = System.identityHashCode(abbwi.byteBuffer);
256                   if (inputBbAddress != bbAddress)
257                   {
258                       if (debug)
259                       {
260                            // print address of ByteBuffer being released
261                            StringBuffer sb = new StringBuffer(80);
262                            sb.append("close() - fragmentStack - releasing ")
263                              .append("ByteBuffer id (" + bbAddress + ") to ")
264                              .append("ByteBufferPool.");
265                            String msg = sb.toString();
266                            dprint(msg);
267                       }
268                       byteBufferPool.releaseByteBuffer(abbwi.byteBuffer);
269                   }
270                }
271            }
272            fragmentStack = null;
273        }
274
275    }
276
277    protected ByteBufferPool getByteBufferPool()
278    {
279        return orb.getByteBufferPool();
280    }
281
282    private void dprint(String msg)
283    {
284        ORBUtility.dprint("BufferManagerReadStream", msg);
285    }
286
287    // Mark and reset handler ----------------------------------------
288
289    private boolean markEngaged = false;
290
291    // List of fragment ByteBufferWithInfos received since
292    // the mark was engaged.
293    private LinkedList fragmentStack = null;
294    private RestorableInputStream inputStream = null;
295
296    // Original state of the stream
297    private Object streamMemento = null;
298
299    public void mark(RestorableInputStream inputStream)
300    {
301        this.inputStream = inputStream;
302        markEngaged = true;
303
304        // Get the magic Object that the stream will use to
305        // reconstruct it's state when reset is called
306        streamMemento = inputStream.createStreamMemento();
307
308        if (fragmentStack != null) {
309            fragmentStack.clear();
310        }
311    }
312
313    // Collects fragments received since the mark was engaged.
314    public void fragmentationOccured(ByteBufferWithInfo newFragment)
315    {
316        if (!markEngaged)
317            return;
318
319        if (fragmentStack == null)
320            fragmentStack = new LinkedList();
321
322        fragmentStack.addFirst(new ByteBufferWithInfo(newFragment));
323    }
324
325    public void reset()
326    {
327        if (!markEngaged) {
328            // REVISIT - call to reset without call to mark
329            return;
330        }
331
332        markEngaged = false;
333
334        // If we actually did peek across fragments, we need
335        // to push those fragments onto the front of the
336        // buffer queue.
337        if (fragmentStack != null && fragmentStack.size() != 0) {
338            ListIterator iter = fragmentStack.listIterator();
339
340            synchronized(fragmentQueue) {
341                while (iter.hasNext()) {
342                    fragmentQueue.push((ByteBufferWithInfo)iter.next());
343                }
344            }
345
346            fragmentStack.clear();
347        }
348
349        // Give the stream the magic Object to restore
350        // it's state.
351        inputStream.restoreInternalState(streamMemento);
352    }
353
354    public MarkAndResetHandler getMarkAndResetHandler() {
355        return this;
356    }
357}
358