1/*
2 * Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package sun.rmi.transport.tcp;
26
27import java.io.*;
28
29/**
30 * MultiplexOutputStream manages sending data over a connection managed
31 * by a ConnectionMultiplexer object.  Data written is buffered until the
32 * internal buffer is full or the flush() method is called, at which
33 * point it attempts to push a packet of bytes through to the remote
34 * endpoint.  This will never push more bytes than the amount already
35 * requested by the remote endpoint (to prevent receive buffer from
36 * overflowing), so if the write() and flush() methods will block
37 * until their operation can complete if enough bytes cannot be
38 * pushed immediately.
39 *
40 * @author Peter Jones
41 */
42final class MultiplexOutputStream extends OutputStream {
43
44    /** object managing multiplexed connection */
45    private ConnectionMultiplexer manager;
46
47    /** information about the connection this is the output stream for */
48    private MultiplexConnectionInfo info;
49
50    /** output buffer */
51    private byte buffer[];
52
53    /** current position to write to in output buffer */
54    private int pos = 0;
55
56    /** pending number of bytes requested by remote endpoint */
57    private int requested = 0;
58
59    /** true if this connection has been disconnected */
60    private boolean disconnected = false;
61
62    /**
63     * lock acquired to access shared variables:
64     * requested & disconnected
65     * WARNING:  Any of the methods manager.send*() should not be
66     * invoked while this lock is held, since they could potentially
67     * block if the underlying connection's transport buffers are
68     * full, and the manager may need to acquire this lock to process
69     * and consume data coming over the underlying connection.
70     */
71    private Object lock = new Object();
72
73    /**
74     * Create a new MultiplexOutputStream for the given manager.
75     * @param manager object that manages this connection
76     * @param info structure for connection this stream writes to
77     * @param bufferLength length of output buffer
78     */
79    MultiplexOutputStream(
80        ConnectionMultiplexer    manager,
81        MultiplexConnectionInfo  info,
82        int                      bufferLength)
83    {
84        this.manager = manager;
85        this.info    = info;
86
87        buffer = new byte[bufferLength];
88        pos = 0;
89    }
90
91    /**
92     * Write a byte over connection.
93     * @param b byte of data to write
94     */
95    public synchronized void write(int b) throws IOException
96    {
97        while (pos >= buffer.length)
98            push();
99        buffer[pos ++] = (byte) b;
100    }
101
102    /**
103     * Write a subarray of bytes over connection.
104     * @param b array containing bytes to write
105     * @param off offset of beginning of bytes to write
106     * @param len number of bytes to write
107     */
108    public synchronized void write(byte b[], int off, int len)
109        throws IOException
110    {
111        if (len <= 0)
112            return;
113
114        // if enough free space in output buffer, just copy into there
115        int freeSpace = buffer.length - pos;
116        if (len <= freeSpace) {
117            System.arraycopy(b, off, buffer, pos, len);
118            pos += len;
119            return;
120        }
121
122        // else, flush buffer and send rest directly to avoid array copy
123        flush();
124        int local_requested;
125        while (true) {
126            synchronized (lock) {
127                while ((local_requested = requested) < 1 && !disconnected) {
128                    try {
129                        lock.wait();
130                    } catch (InterruptedException e) {
131                    }
132                }
133                if (disconnected)
134                    throw new IOException("Connection closed");
135            }
136
137            if (local_requested < len) {
138                manager.sendTransmit(info, b, off, local_requested);
139                off += local_requested;
140                len -= local_requested;
141                synchronized (lock) {
142                    requested -= local_requested;
143                }
144            }
145            else {
146                manager.sendTransmit(info, b, off, len);
147                synchronized (lock) {
148                    requested -= len;
149                }
150                // len = 0;
151                break;
152            }
153        }
154    }
155
156    /**
157     * Guarantee that all data written to this stream has been pushed
158     * over and made available to the remote endpoint.
159     */
160    public synchronized void flush() throws IOException {
161        while (pos > 0)
162            push();
163    }
164
165    /**
166     * Close this connection.
167     */
168    public void close() throws IOException
169    {
170        manager.sendClose(info);
171    }
172
173    /**
174     * Take note of more bytes requested by connection at remote endpoint.
175     * @param num number of additional bytes requested
176     */
177    void request(int num)
178    {
179        synchronized (lock) {
180            requested += num;
181            lock.notifyAll();
182        }
183    }
184
185    /**
186     * Disconnect this stream from all connection activity.
187     */
188    void disconnect()
189    {
190        synchronized (lock) {
191            disconnected = true;
192            lock.notifyAll();
193        }
194    }
195
196    /**
197     * Push bytes in output buffer to connection at remote endpoint.
198     * This method blocks until at least one byte has been pushed across.
199     */
200    private void push() throws IOException
201    {
202        int local_requested;
203        synchronized (lock) {
204            while ((local_requested = requested) < 1 && !disconnected) {
205                try {
206                    lock.wait();
207                } catch (InterruptedException e) {
208                }
209            }
210            if (disconnected)
211                throw new IOException("Connection closed");
212        }
213
214        if (local_requested < pos) {
215            manager.sendTransmit(info, buffer, 0, local_requested);
216            System.arraycopy(buffer, local_requested,
217                             buffer, 0, pos - local_requested);
218            pos -= local_requested;
219            synchronized (lock) {
220                requested -= local_requested;
221            }
222        }
223        else {
224            manager.sendTransmit(info, buffer, 0, pos);
225            synchronized (lock) {
226                requested -= pos;
227            }
228            pos = 0;
229        }
230    }
231}
232