1/* 2 * Copyright (c) 1996, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25package sun.rmi.transport.tcp; 26 27import java.io.*; 28 29/** 30 * MultiplexOutputStream manages sending data over a connection managed 31 * by a ConnectionMultiplexer object. Data written is buffered until the 32 * internal buffer is full or the flush() method is called, at which 33 * point it attempts to push a packet of bytes through to the remote 34 * endpoint. This will never push more bytes than the amount already 35 * requested by the remote endpoint (to prevent receive buffer from 36 * overflowing), so if the write() and flush() methods will block 37 * until their operation can complete if enough bytes cannot be 38 * pushed immediately. 39 * 40 * @author Peter Jones 41 */ 42final class MultiplexOutputStream extends OutputStream { 43 44 /** object managing multiplexed connection */ 45 private ConnectionMultiplexer manager; 46 47 /** information about the connection this is the output stream for */ 48 private MultiplexConnectionInfo info; 49 50 /** output buffer */ 51 private byte buffer[]; 52 53 /** current position to write to in output buffer */ 54 private int pos = 0; 55 56 /** pending number of bytes requested by remote endpoint */ 57 private int requested = 0; 58 59 /** true if this connection has been disconnected */ 60 private boolean disconnected = false; 61 62 /** 63 * lock acquired to access shared variables: 64 * requested & disconnected 65 * WARNING: Any of the methods manager.send*() should not be 66 * invoked while this lock is held, since they could potentially 67 * block if the underlying connection's transport buffers are 68 * full, and the manager may need to acquire this lock to process 69 * and consume data coming over the underlying connection. 70 */ 71 private Object lock = new Object(); 72 73 /** 74 * Create a new MultiplexOutputStream for the given manager. 75 * @param manager object that manages this connection 76 * @param info structure for connection this stream writes to 77 * @param bufferLength length of output buffer 78 */ 79 MultiplexOutputStream( 80 ConnectionMultiplexer manager, 81 MultiplexConnectionInfo info, 82 int bufferLength) 83 { 84 this.manager = manager; 85 this.info = info; 86 87 buffer = new byte[bufferLength]; 88 pos = 0; 89 } 90 91 /** 92 * Write a byte over connection. 93 * @param b byte of data to write 94 */ 95 public synchronized void write(int b) throws IOException 96 { 97 while (pos >= buffer.length) 98 push(); 99 buffer[pos ++] = (byte) b; 100 } 101 102 /** 103 * Write a subarray of bytes over connection. 104 * @param b array containing bytes to write 105 * @param off offset of beginning of bytes to write 106 * @param len number of bytes to write 107 */ 108 public synchronized void write(byte b[], int off, int len) 109 throws IOException 110 { 111 if (len <= 0) 112 return; 113 114 // if enough free space in output buffer, just copy into there 115 int freeSpace = buffer.length - pos; 116 if (len <= freeSpace) { 117 System.arraycopy(b, off, buffer, pos, len); 118 pos += len; 119 return; 120 } 121 122 // else, flush buffer and send rest directly to avoid array copy 123 flush(); 124 int local_requested; 125 while (true) { 126 synchronized (lock) { 127 while ((local_requested = requested) < 1 && !disconnected) { 128 try { 129 lock.wait(); 130 } catch (InterruptedException e) { 131 } 132 } 133 if (disconnected) 134 throw new IOException("Connection closed"); 135 } 136 137 if (local_requested < len) { 138 manager.sendTransmit(info, b, off, local_requested); 139 off += local_requested; 140 len -= local_requested; 141 synchronized (lock) { 142 requested -= local_requested; 143 } 144 } 145 else { 146 manager.sendTransmit(info, b, off, len); 147 synchronized (lock) { 148 requested -= len; 149 } 150 // len = 0; 151 break; 152 } 153 } 154 } 155 156 /** 157 * Guarantee that all data written to this stream has been pushed 158 * over and made available to the remote endpoint. 159 */ 160 public synchronized void flush() throws IOException { 161 while (pos > 0) 162 push(); 163 } 164 165 /** 166 * Close this connection. 167 */ 168 public void close() throws IOException 169 { 170 manager.sendClose(info); 171 } 172 173 /** 174 * Take note of more bytes requested by connection at remote endpoint. 175 * @param num number of additional bytes requested 176 */ 177 void request(int num) 178 { 179 synchronized (lock) { 180 requested += num; 181 lock.notifyAll(); 182 } 183 } 184 185 /** 186 * Disconnect this stream from all connection activity. 187 */ 188 void disconnect() 189 { 190 synchronized (lock) { 191 disconnected = true; 192 lock.notifyAll(); 193 } 194 } 195 196 /** 197 * Push bytes in output buffer to connection at remote endpoint. 198 * This method blocks until at least one byte has been pushed across. 199 */ 200 private void push() throws IOException 201 { 202 int local_requested; 203 synchronized (lock) { 204 while ((local_requested = requested) < 1 && !disconnected) { 205 try { 206 lock.wait(); 207 } catch (InterruptedException e) { 208 } 209 } 210 if (disconnected) 211 throw new IOException("Connection closed"); 212 } 213 214 if (local_requested < pos) { 215 manager.sendTransmit(info, buffer, 0, local_requested); 216 System.arraycopy(buffer, local_requested, 217 buffer, 0, pos - local_requested); 218 pos -= local_requested; 219 synchronized (lock) { 220 requested -= local_requested; 221 } 222 } 223 else { 224 manager.sendTransmit(info, buffer, 0, pos); 225 synchronized (lock) { 226 requested -= pos; 227 } 228 pos = 0; 229 } 230 } 231} 232