1/*
2 * Copyright (c) 1996, 2017, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package sun.rmi.transport.tcp;
26
27import java.io.*;
28import java.util.*;
29import java.rmi.server.LogStream;
30import java.security.PrivilegedAction;
31
32import sun.rmi.runtime.Log;
33
34/**
35 * ConnectionMultiplexer manages the transparent multiplexing of
36 * multiple virtual connections from one endpoint to another through
37 * one given real connection to that endpoint.  The input and output
38 * streams for the underlying real connection must be supplied.
39 * A callback object is also supplied to be informed of new virtual
40 * connections opened by the remote endpoint.  After creation, the
41 * run() method must be called in a thread created for demultiplexing
42 * the connections.  The openConnection() method is called to
43 * initiate a virtual connection from this endpoint.
44 *
45 * @author Peter Jones
46 */
47@SuppressWarnings("deprecation")
48final class ConnectionMultiplexer {
49
50    /** "multiplex" log level */
51    static int logLevel = LogStream.parseLevel(getLogLevel());
52
53    private static String getLogLevel() {
54        return java.security.AccessController.doPrivileged(
55           (PrivilegedAction<String>) () -> System.getProperty("sun.rmi.transport.tcp.multiplex.logLevel"));
56    }
57
58    /* multiplex system log */
59    static final Log multiplexLog =
60        Log.getLog("sun.rmi.transport.tcp.multiplex",
61                   "multiplex", ConnectionMultiplexer.logLevel);
62
63    /** multiplexing protocol operation codes */
64    private final static int OPEN     = 0xE1;
65    private final static int CLOSE    = 0xE2;
66    private final static int CLOSEACK = 0xE3;
67    private final static int REQUEST  = 0xE4;
68    private final static int TRANSMIT = 0xE5;
69
70    /** object to notify for new connections from remote endpoint */
71    private TCPChannel channel;
72
73    /** input stream for underlying single connection */
74    private InputStream in;
75
76    /** output stream for underlying single connection */
77    private OutputStream out;
78
79    /** true if underlying connection originated from this endpoint
80        (used for generating unique connection IDs) */
81    private boolean orig;
82
83    /** layered stream for reading formatted data from underlying connection */
84    private DataInputStream dataIn;
85
86    /** layered stream for writing formatted data to underlying connection */
87    private DataOutputStream dataOut;
88
89    /** table holding currently open connection IDs and related info */
90    private Hashtable<Integer, MultiplexConnectionInfo> connectionTable = new Hashtable<>(7);
91
92    /** number of currently open connections */
93    private int numConnections = 0;
94
95    /** maximum allowed open connections */
96    private final static int maxConnections = 256;
97
98    /** ID of last connection opened */
99    private int lastID = 0x1001;
100
101    /** true if this mechanism is still alive */
102    private boolean alive = true;
103
104    /**
105     * Create a new ConnectionMultiplexer using the given underlying
106     * input/output stream pair.  The run method must be called
107     * (possibly on a new thread) to handle the demultiplexing.
108     * @param channel object to notify when new connection is received
109     * @param in input stream of underlying connection
110     * @param out output stream of underlying connection
111     * @param orig true if this endpoint intiated the underlying
112     *        connection (needs to be set differently at both ends)
113     */
114    public ConnectionMultiplexer(
115        TCPChannel    channel,
116        InputStream   in,
117        OutputStream  out,
118        boolean       orig)
119    {
120        this.channel = channel;
121        this.in      = in;
122        this.out     = out;
123        this.orig    = orig;
124
125        dataIn = new DataInputStream(in);
126        dataOut = new DataOutputStream(out);
127    }
128
129    /**
130     * Process multiplexing protocol received from underlying connection.
131     */
132    public void run() throws IOException
133    {
134        try {
135            int op, id, length;
136            MultiplexConnectionInfo info;
137
138            while (true) {
139
140                // read next op code from remote endpoint
141                op = dataIn.readUnsignedByte();
142                switch (op) {
143
144                // remote endpoint initiating new connection
145                case OPEN:
146                    id = dataIn.readUnsignedShort();
147
148                    if (multiplexLog.isLoggable(Log.VERBOSE)) {
149                        multiplexLog.log(Log.VERBOSE, "operation  OPEN " + id);
150                    }
151
152                    info = connectionTable.get(id);
153                    if (info != null)
154                        throw new IOException(
155                            "OPEN: Connection ID already exists");
156                    info = new MultiplexConnectionInfo(id);
157                    info.in = new MultiplexInputStream(this, info, 2048);
158                    info.out = new MultiplexOutputStream(this, info, 2048);
159                    synchronized (connectionTable) {
160                        connectionTable.put(id, info);
161                        ++ numConnections;
162                    }
163                    sun.rmi.transport.Connection conn;
164                    conn = new TCPConnection(channel, info.in, info.out);
165                    channel.acceptMultiplexConnection(conn);
166                    break;
167
168                // remote endpoint closing connection
169                case CLOSE:
170                    id = dataIn.readUnsignedShort();
171
172                    if (multiplexLog.isLoggable(Log.VERBOSE)) {
173                        multiplexLog.log(Log.VERBOSE, "operation  CLOSE " + id);
174                    }
175
176                    info = connectionTable.get(id);
177                    if (info == null)
178                        throw new IOException(
179                            "CLOSE: Invalid connection ID");
180                    info.in.disconnect();
181                    info.out.disconnect();
182                    if (!info.closed)
183                        sendCloseAck(info);
184                    synchronized (connectionTable) {
185                        connectionTable.remove(id);
186                        -- numConnections;
187                    }
188                    break;
189
190                // remote endpoint acknowledging close of connection
191                case CLOSEACK:
192                    id = dataIn.readUnsignedShort();
193
194                    if (multiplexLog.isLoggable(Log.VERBOSE)) {
195                        multiplexLog.log(Log.VERBOSE,
196                            "operation  CLOSEACK " + id);
197                    }
198
199                    info = connectionTable.get(id);
200                    if (info == null)
201                        throw new IOException(
202                            "CLOSEACK: Invalid connection ID");
203                    if (!info.closed)
204                        throw new IOException(
205                            "CLOSEACK: Connection not closed");
206                    info.in.disconnect();
207                    info.out.disconnect();
208                    synchronized (connectionTable) {
209                        connectionTable.remove(id);
210                        -- numConnections;
211                    }
212                    break;
213
214                // remote endpoint declaring additional bytes receivable
215                case REQUEST:
216                    id = dataIn.readUnsignedShort();
217                    info = connectionTable.get(id);
218                    if (info == null)
219                        throw new IOException(
220                            "REQUEST: Invalid connection ID");
221                    length = dataIn.readInt();
222
223                    if (multiplexLog.isLoggable(Log.VERBOSE)) {
224                        multiplexLog.log(Log.VERBOSE,
225                            "operation  REQUEST " + id + ": " + length);
226                    }
227
228                    info.out.request(length);
229                    break;
230
231                // remote endpoint transmitting data packet
232                case TRANSMIT:
233                    id = dataIn.readUnsignedShort();
234                    info = connectionTable.get(id);
235                    if (info == null)
236                        throw new IOException("SEND: Invalid connection ID");
237                    length = dataIn.readInt();
238
239                    if (multiplexLog.isLoggable(Log.VERBOSE)) {
240                        multiplexLog.log(Log.VERBOSE,
241                            "operation  TRANSMIT " + id + ": " + length);
242                    }
243
244                    info.in.receive(length, dataIn);
245                    break;
246
247                default:
248                    throw new IOException("Invalid operation: " +
249                                          Integer.toHexString(op));
250                }
251            }
252        } finally {
253            shutDown();
254        }
255    }
256
257    /**
258     * Initiate a new multiplexed connection through the underlying
259     * connection.
260     */
261    public synchronized TCPConnection openConnection() throws IOException
262    {
263        // generate ID that should not be already used
264        // If all possible 32768 IDs are used,
265        // this method will block searching for a new ID forever.
266        int id;
267        do {
268            lastID = (++ lastID) & 0x7FFF;
269            id = lastID;
270
271            // The orig flag (copied to the high bit of the ID) is used
272            // to have two distinct ranges to choose IDs from for the
273            // two endpoints.
274            if (orig)
275                id |= 0x8000;
276        } while (connectionTable.get(id) != null);
277
278        // create multiplexing streams and bookkeeping information
279        MultiplexConnectionInfo info = new MultiplexConnectionInfo(id);
280        info.in = new MultiplexInputStream(this, info, 2048);
281        info.out = new MultiplexOutputStream(this, info, 2048);
282
283        // add to connection table if multiplexer has not died
284        synchronized (connectionTable) {
285            if (!alive)
286                throw new IOException("Multiplexer connection dead");
287            if (numConnections >= maxConnections)
288                throw new IOException("Cannot exceed " + maxConnections +
289                    " simultaneous multiplexed connections");
290            connectionTable.put(id, info);
291            ++ numConnections;
292        }
293
294        // inform remote endpoint of new connection
295        synchronized (dataOut) {
296            try {
297                dataOut.writeByte(OPEN);
298                dataOut.writeShort(id);
299                dataOut.flush();
300            } catch (IOException e) {
301                multiplexLog.log(Log.BRIEF, "exception: ", e);
302
303                shutDown();
304                throw e;
305            }
306        }
307
308        return new TCPConnection(channel, info.in, info.out);
309    }
310
311    /**
312     * Shut down all connections and clean up.
313     */
314    public void shutDown()
315    {
316        // inform all associated streams
317        synchronized (connectionTable) {
318            // return if multiplexer already officially dead
319            if (!alive)
320                return;
321            alive = false;
322
323            Enumeration<MultiplexConnectionInfo> enum_ =
324                    connectionTable.elements();
325            while (enum_.hasMoreElements()) {
326                MultiplexConnectionInfo info = enum_.nextElement();
327                info.in.disconnect();
328                info.out.disconnect();
329            }
330            connectionTable.clear();
331            numConnections = 0;
332        }
333
334        // close underlying connection, if possible (and not already done)
335        try {
336            in.close();
337        } catch (IOException e) {
338        }
339        try {
340            out.close();
341        } catch (IOException e) {
342        }
343    }
344
345    /**
346     * Send request for more data on connection to remote endpoint.
347     * @param info connection information structure
348     * @param len number of more bytes that can be received
349     */
350    void sendRequest(MultiplexConnectionInfo info, int len) throws IOException
351    {
352        synchronized (dataOut) {
353            if (alive && !info.closed)
354                try {
355                    dataOut.writeByte(REQUEST);
356                    dataOut.writeShort(info.id);
357                    dataOut.writeInt(len);
358                    dataOut.flush();
359                } catch (IOException e) {
360                    multiplexLog.log(Log.BRIEF, "exception: ", e);
361
362                    shutDown();
363                    throw e;
364                }
365        }
366    }
367
368    /**
369     * Send packet of requested data on connection to remote endpoint.
370     * @param info connection information structure
371     * @param buf array containing bytes to send
372     * @param off offset of first array index of packet
373     * @param len number of bytes in packet to send
374     */
375    void sendTransmit(MultiplexConnectionInfo info,
376                      byte buf[], int off, int len) throws IOException
377    {
378        synchronized (dataOut) {
379            if (alive && !info.closed)
380                try {
381                    dataOut.writeByte(TRANSMIT);
382                    dataOut.writeShort(info.id);
383                    dataOut.writeInt(len);
384                    dataOut.write(buf, off, len);
385                    dataOut.flush();
386                } catch (IOException e) {
387                    multiplexLog.log(Log.BRIEF, "exception: ", e);
388
389                    shutDown();
390                    throw e;
391                }
392        }
393    }
394
395    /**
396     * Inform remote endpoint that connection has been closed.
397     * @param info connection information structure
398     */
399    void sendClose(MultiplexConnectionInfo info) throws IOException
400    {
401        info.out.disconnect();
402        synchronized (dataOut) {
403            if (alive && !info.closed)
404                try {
405                    dataOut.writeByte(CLOSE);
406                    dataOut.writeShort(info.id);
407                    dataOut.flush();
408                    info.closed = true;
409                } catch (IOException e) {
410                    multiplexLog.log(Log.BRIEF, "exception: ", e);
411
412                    shutDown();
413                    throw e;
414                }
415        }
416    }
417
418    /**
419     * Acknowledge remote endpoint's closing of connection.
420     * @param info connection information structure
421     */
422    void sendCloseAck(MultiplexConnectionInfo info) throws IOException
423    {
424        synchronized (dataOut) {
425            if (alive && !info.closed)
426                try {
427                    dataOut.writeByte(CLOSEACK);
428                    dataOut.writeShort(info.id);
429                    dataOut.flush();
430                    info.closed = true;
431                } catch (IOException e) {
432                    multiplexLog.log(Log.BRIEF, "exception: ", e);
433
434                    shutDown();
435                    throw e;
436                }
437        }
438    }
439
440    /**
441     * Shut down connection upon finalization.
442     */
443    @SuppressWarnings("deprecation")
444    protected void finalize() throws Throwable
445    {
446        super.finalize();
447        shutDown();
448    }
449}
450