1/* 2 * Copyright (c) 1996, 1997, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25package sun.rmi.transport.tcp; 26 27import java.io.*; 28 29/** 30 * MultiplexInputStream manages receiving data over a connection managed 31 * by a ConnectionMultiplexer object. This object is responsible for 32 * requesting more bytes of data as space in its internal buffer becomes 33 * available. 34 * 35 * @author Peter Jones 36 */ 37final class MultiplexInputStream extends InputStream { 38 39 /** object managing multiplexed connection */ 40 private ConnectionMultiplexer manager; 41 42 /** information about the connection this is the input stream for */ 43 private MultiplexConnectionInfo info; 44 45 /** input buffer */ 46 private byte buffer[]; 47 48 /** number of real data bytes present in buffer */ 49 private int present = 0; 50 51 /** current position to read from in input buffer */ 52 private int pos = 0; 53 54 /** pending number of bytes this stream has requested */ 55 private int requested = 0; 56 57 /** true if this connection has been disconnected */ 58 private boolean disconnected = false; 59 60 /** 61 * lock acquired to access shared variables: 62 * buffer, present, pos, requested, & disconnected 63 * WARNING: Any of the methods manager.send*() should not be 64 * invoked while this lock is held, since they could potentially 65 * block if the underlying connection's transport buffers are 66 * full, and the manager may need to acquire this lock to process 67 * and consume data coming over the underlying connection. 68 */ 69 private Object lock = new Object(); 70 71 /** level at which more data is requested when read past */ 72 private int waterMark; 73 74 /** data structure for holding reads of one byte */ 75 private byte temp[] = new byte[1]; 76 77 /** 78 * Create a new MultiplexInputStream for the given manager. 79 * @param manager object that manages this connection 80 * @param info structure for connection this stream reads from 81 * @param bufferLength length of input buffer 82 */ 83 MultiplexInputStream( 84 ConnectionMultiplexer manager, 85 MultiplexConnectionInfo info, 86 int bufferLength) 87 { 88 this.manager = manager; 89 this.info = info; 90 91 buffer = new byte[bufferLength]; 92 waterMark = bufferLength / 2; 93 } 94 95 /** 96 * Read a byte from the connection. 97 */ 98 public synchronized int read() throws IOException 99 { 100 int n = read(temp, 0, 1); 101 if (n != 1) 102 return -1; 103 return temp[0] & 0xFF; 104 } 105 106 /** 107 * Read a subarray of bytes from connection. This method blocks for 108 * at least one byte, and it returns the number of bytes actually read, 109 * or -1 if the end of the stream was detected. 110 * @param b array to read bytes into 111 * @param off offset of beginning of bytes to read into 112 * @param len number of bytes to read 113 */ 114 public synchronized int read(byte b[], int off, int len) throws IOException 115 { 116 if (len <= 0) 117 return 0; 118 119 int moreSpace; 120 synchronized (lock) { 121 if (pos >= present) 122 pos = present = 0; 123 else if (pos >= waterMark) { 124 System.arraycopy(buffer, pos, buffer, 0, present - pos); 125 present -= pos; 126 pos = 0; 127 } 128 int freeSpace = buffer.length - present; 129 moreSpace = Math.max(freeSpace - requested, 0); 130 } 131 if (moreSpace > 0) 132 manager.sendRequest(info, moreSpace); 133 synchronized (lock) { 134 requested += moreSpace; 135 while ((pos >= present) && !disconnected) { 136 try { 137 lock.wait(); 138 } catch (InterruptedException e) { 139 } 140 } 141 if (disconnected && pos >= present) 142 return -1; 143 144 int available = present - pos; 145 if (len < available) { 146 System.arraycopy(buffer, pos, b, off, len); 147 pos += len; 148 return len; 149 } 150 else { 151 System.arraycopy(buffer, pos, b, off, available); 152 pos = present = 0; 153 // could send another request here, if len > available?? 154 return available; 155 } 156 } 157 } 158 159 /** 160 * Return the number of bytes immediately available for reading. 161 */ 162 public int available() throws IOException 163 { 164 synchronized (lock) { 165 return present - pos; 166 } 167 } 168 169 /** 170 * Close this connection. 171 */ 172 public void close() throws IOException 173 { 174 manager.sendClose(info); 175 } 176 177 /** 178 * Receive bytes transmitted from connection at remote endpoint. 179 * @param length number of bytes transmitted 180 * @param in input stream with those bytes ready to be read 181 */ 182 void receive(int length, DataInputStream in) 183 throws IOException 184 { 185 /* TO DO: Optimize so that data received from stream can be loaded 186 * directly into user's buffer if there is a pending read(). 187 */ 188 synchronized (lock) { 189 if ((pos > 0) && ((buffer.length - present) < length)) { 190 System.arraycopy(buffer, pos, buffer, 0, present - pos); 191 present -= pos; 192 pos = 0; 193 } 194 if ((buffer.length - present) < length) 195 throw new IOException("Receive buffer overflow"); 196 in.readFully(buffer, present, length); 197 present += length; 198 requested -= length; 199 lock.notifyAll(); 200 } 201 } 202 203 /** 204 * Disconnect this stream from all connection activity. 205 */ 206 void disconnect() 207 { 208 synchronized (lock) { 209 disconnected = true; 210 lock.notifyAll(); 211 } 212 } 213} 214