1/*
2 * Copyright (c) 1996, 1997, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package sun.rmi.transport.tcp;
26
27import java.io.*;
28
29/**
30 * MultiplexInputStream manages receiving data over a connection managed
31 * by a ConnectionMultiplexer object.  This object is responsible for
32 * requesting more bytes of data as space in its internal buffer becomes
33 * available.
34 *
35 * @author Peter Jones
36 */
37final class MultiplexInputStream extends InputStream {
38
39    /** object managing multiplexed connection */
40    private ConnectionMultiplexer manager;
41
42    /** information about the connection this is the input stream for */
43    private MultiplexConnectionInfo info;
44
45    /** input buffer */
46    private byte buffer[];
47
48    /** number of real data bytes present in buffer */
49    private int present = 0;
50
51    /** current position to read from in input buffer */
52    private int pos = 0;
53
54    /** pending number of bytes this stream has requested */
55    private int requested = 0;
56
57    /** true if this connection has been disconnected */
58    private boolean disconnected = false;
59
60    /**
61     * lock acquired to access shared variables:
62     * buffer, present, pos, requested, & disconnected
63     * WARNING:  Any of the methods manager.send*() should not be
64     * invoked while this lock is held, since they could potentially
65     * block if the underlying connection's transport buffers are
66     * full, and the manager may need to acquire this lock to process
67     * and consume data coming over the underlying connection.
68     */
69    private Object lock = new Object();
70
71    /** level at which more data is requested when read past */
72    private int waterMark;
73
74    /** data structure for holding reads of one byte */
75    private byte temp[] = new byte[1];
76
77    /**
78     * Create a new MultiplexInputStream for the given manager.
79     * @param manager object that manages this connection
80     * @param info structure for connection this stream reads from
81     * @param bufferLength length of input buffer
82     */
83    MultiplexInputStream(
84        ConnectionMultiplexer    manager,
85        MultiplexConnectionInfo  info,
86        int                      bufferLength)
87    {
88        this.manager = manager;
89        this.info    = info;
90
91        buffer = new byte[bufferLength];
92        waterMark = bufferLength / 2;
93    }
94
95    /**
96     * Read a byte from the connection.
97     */
98    public synchronized int read() throws IOException
99    {
100        int n = read(temp, 0, 1);
101        if (n != 1)
102            return -1;
103        return temp[0] & 0xFF;
104    }
105
106    /**
107     * Read a subarray of bytes from connection.  This method blocks for
108     * at least one byte, and it returns the number of bytes actually read,
109     * or -1 if the end of the stream was detected.
110     * @param b array to read bytes into
111     * @param off offset of beginning of bytes to read into
112     * @param len number of bytes to read
113     */
114    public synchronized int read(byte b[], int off, int len) throws IOException
115    {
116        if (len <= 0)
117            return 0;
118
119        int moreSpace;
120        synchronized (lock) {
121            if (pos >= present)
122                pos = present = 0;
123            else if (pos >= waterMark) {
124                System.arraycopy(buffer, pos, buffer, 0, present - pos);
125                present -= pos;
126                pos = 0;
127            }
128            int freeSpace = buffer.length - present;
129            moreSpace = Math.max(freeSpace - requested, 0);
130        }
131        if (moreSpace > 0)
132            manager.sendRequest(info, moreSpace);
133        synchronized (lock) {
134            requested += moreSpace;
135            while ((pos >= present) && !disconnected) {
136                try {
137                    lock.wait();
138                } catch (InterruptedException e) {
139                }
140            }
141            if (disconnected && pos >= present)
142                return -1;
143
144            int available = present - pos;
145            if (len < available) {
146                System.arraycopy(buffer, pos, b, off, len);
147                pos += len;
148                return len;
149            }
150            else {
151                System.arraycopy(buffer, pos, b, off, available);
152                pos = present = 0;
153                // could send another request here, if len > available??
154                return available;
155            }
156        }
157    }
158
159    /**
160     * Return the number of bytes immediately available for reading.
161     */
162    public int available() throws IOException
163    {
164        synchronized (lock) {
165            return present - pos;
166        }
167    }
168
169    /**
170     * Close this connection.
171     */
172    public void close() throws IOException
173    {
174        manager.sendClose(info);
175    }
176
177    /**
178     * Receive bytes transmitted from connection at remote endpoint.
179     * @param length number of bytes transmitted
180     * @param in input stream with those bytes ready to be read
181     */
182    void receive(int length, DataInputStream in)
183        throws IOException
184    {
185        /* TO DO: Optimize so that data received from stream can be loaded
186         * directly into user's buffer if there is a pending read().
187         */
188        synchronized (lock) {
189            if ((pos > 0) && ((buffer.length - present) < length)) {
190                System.arraycopy(buffer, pos, buffer, 0, present - pos);
191                present -= pos;
192                pos = 0;
193            }
194            if ((buffer.length - present) < length)
195                throw new IOException("Receive buffer overflow");
196            in.readFully(buffer, present, length);
197            present += length;
198            requested -= length;
199            lock.notifyAll();
200        }
201    }
202
203    /**
204     * Disconnect this stream from all connection activity.
205     */
206    void disconnect()
207    {
208        synchronized (lock) {
209            disconnected = true;
210            lock.notifyAll();
211        }
212    }
213}
214