1/*
2 * Copyright (c) 1996, 2017, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package sun.rmi.transport.tcp;
26
27import java.io.DataInputStream;
28import java.io.DataOutputStream;
29import java.io.IOException;
30import java.lang.ref.Reference;
31import java.lang.ref.SoftReference;
32import java.net.Socket;
33import java.rmi.ConnectIOException;
34import java.rmi.RemoteException;
35import java.security.AccessControlContext;
36import java.security.AccessController;
37import java.security.PrivilegedAction;
38import java.util.ArrayList;
39import java.util.List;
40import java.util.ListIterator;
41import java.util.WeakHashMap;
42import java.util.concurrent.Future;
43import java.util.concurrent.ScheduledExecutorService;
44import java.util.concurrent.TimeUnit;
45import sun.rmi.runtime.Log;
46import sun.rmi.runtime.NewThreadAction;
47import sun.rmi.runtime.RuntimeUtil;
48import sun.rmi.transport.Channel;
49import sun.rmi.transport.Connection;
50import sun.rmi.transport.Endpoint;
51import sun.rmi.transport.TransportConstants;
52
53/**
54 * TCPChannel is the socket-based implementation of the RMI Channel
55 * abstraction.
56 *
57 * @author Ann Wollrath
58 */
59public class TCPChannel implements Channel {
60    /** endpoint for this channel */
61    private final TCPEndpoint ep;
62    /** transport for this channel */
63    private final TCPTransport tr;
64    /** list of cached connections */
65    private final List<TCPConnection> freeList =
66        new ArrayList<>();
67    /** frees cached connections that have expired (guarded by freeList) */
68    private Future<?> reaper = null;
69
70    /** connection acceptor (should be in TCPTransport) */
71    private ConnectionAcceptor acceptor;
72
73    /** most recently authorized AccessControlContext */
74    private AccessControlContext okContext;
75
76    /** cache of authorized AccessControlContexts */
77    private WeakHashMap<AccessControlContext,
78                        Reference<AccessControlContext>> authcache;
79
80    /** the SecurityManager which authorized okContext and authcache */
81    private SecurityManager cacheSecurityManager = null;
82
83    /** client-side connection idle usage timeout */
84    private static final long idleTimeout =             // default 15 seconds
85        AccessController.doPrivileged((PrivilegedAction<Long>) () ->
86            Long.getLong("sun.rmi.transport.connectionTimeout", 15000));
87
88    /** client-side connection handshake read timeout */
89    private static final int handshakeTimeout =         // default 1 minute
90        AccessController.doPrivileged((PrivilegedAction<Integer>) () ->
91            Integer.getInteger("sun.rmi.transport.tcp.handshakeTimeout", 60000));
92
93    /** client-side connection response read timeout (after handshake) */
94    private static final int responseTimeout =          // default infinity
95        AccessController.doPrivileged((PrivilegedAction<Integer>) () ->
96            Integer.getInteger("sun.rmi.transport.tcp.responseTimeout", 0));
97
98    /** thread pool for scheduling delayed tasks */
99    private static final ScheduledExecutorService scheduler =
100        AccessController.doPrivileged(
101            new RuntimeUtil.GetInstanceAction()).getScheduler();
102
103    /**
104     * Create channel for endpoint.
105     */
106    TCPChannel(TCPTransport tr, TCPEndpoint ep) {
107        this.tr = tr;
108        this.ep = ep;
109    }
110
111    /**
112     * Return the endpoint for this channel.
113     */
114    public Endpoint getEndpoint() {
115        return ep;
116    }
117
118    /**
119     * Checks if the current caller has sufficient privilege to make
120     * a connection to the remote endpoint.
121     * @exception SecurityException if caller is not allowed to use this
122     * Channel.
123     */
124    private void checkConnectPermission() throws SecurityException {
125        SecurityManager security = System.getSecurityManager();
126        if (security == null)
127            return;
128
129        if (security != cacheSecurityManager) {
130            // The security manager changed: flush the cache
131            okContext = null;
132            authcache = new WeakHashMap<AccessControlContext,
133                                        Reference<AccessControlContext>>();
134            cacheSecurityManager = security;
135        }
136
137        AccessControlContext ctx = AccessController.getContext();
138
139        // If ctx is the same context as last time, or if it
140        // appears in the cache, bypass the checkConnect.
141        if (okContext == null ||
142            !(okContext.equals(ctx) || authcache.containsKey(ctx)))
143        {
144            security.checkConnect(ep.getHost(), ep.getPort());
145            authcache.put(ctx, new SoftReference<AccessControlContext>(ctx));
146            // A WeakHashMap is transformed into a SoftHashSet by making
147            // each value softly refer to its own key (Peter's idea).
148        }
149        okContext = ctx;
150    }
151
152    /**
153     * Supplies a connection to the endpoint of the address space
154     * for which this is a channel.  The returned connection may
155     * be one retrieved from a cache of idle connections.
156     */
157    public Connection newConnection() throws RemoteException {
158        TCPConnection conn;
159
160        // loop until we find a free live connection (in which case
161        // we return) or until we run out of freelist (in which case
162        // the loop exits)
163        do {
164            conn = null;
165            // try to get a free connection
166            synchronized (freeList) {
167                int elementPos = freeList.size()-1;
168
169                if (elementPos >= 0) {
170                    // If there is a security manager, make sure
171                    // the caller is allowed to connect to the
172                    // requested endpoint.
173                    checkConnectPermission();
174                    conn = freeList.get(elementPos);
175                    freeList.remove(elementPos);
176                }
177            }
178
179            // at this point, conn is null iff the freelist is empty,
180            // and nonnull if a free connection of uncertain vitality
181            // has been found.
182
183            if (conn != null) {
184                // check to see if the connection has closed since last use
185                if (!conn.isDead()) {
186                    TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection");
187                    return conn;
188                }
189
190                // conn is dead, and cannot be reused (reuse => false)
191                this.free(conn, false);
192            }
193        } while (conn != null);
194
195        // none free, so create a new connection
196        return (createConnection());
197    }
198
199    /**
200     * Create a new connection to the remote endpoint of this channel.
201     * The returned connection is new.  The caller must already have
202     * passed a security checkConnect or equivalent.
203     */
204    private Connection createConnection() throws RemoteException {
205        Connection conn;
206
207        TCPTransport.tcpLog.log(Log.BRIEF, "create connection");
208
209        Socket sock = ep.newSocket();
210        conn = new TCPConnection(this, sock);
211
212        try {
213            DataOutputStream out =
214                new DataOutputStream(conn.getOutputStream());
215            writeTransportHeader(out);
216
217            // choose protocol (single op if not reusable socket)
218            if (!conn.isReusable()) {
219                out.writeByte(TransportConstants.SingleOpProtocol);
220            } else {
221                out.writeByte(TransportConstants.StreamProtocol);
222                out.flush();
223
224                /*
225                 * Set socket read timeout to configured value for JRMP
226                 * connection handshake; this also serves to guard against
227                 * non-JRMP servers that do not respond (see 4322806).
228                 */
229                int originalSoTimeout = 0;
230                try {
231                    originalSoTimeout = sock.getSoTimeout();
232                    sock.setSoTimeout(handshakeTimeout);
233                } catch (Exception e) {
234                    // if we fail to set this, ignore and proceed anyway
235                }
236
237                DataInputStream in =
238                    new DataInputStream(conn.getInputStream());
239                byte ack = in.readByte();
240                if (ack != TransportConstants.ProtocolAck) {
241                    throw new ConnectIOException(
242                        ack == TransportConstants.ProtocolNack ?
243                        "JRMP StreamProtocol not supported by server" :
244                        "non-JRMP server at remote endpoint");
245                }
246
247                String suggestedHost = in.readUTF();
248                int    suggestedPort = in.readInt();
249                if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
250                    TCPTransport.tcpLog.log(Log.VERBOSE,
251                        "server suggested " + suggestedHost + ":" +
252                        suggestedPort);
253                }
254
255                // set local host name, if unknown
256                TCPEndpoint.setLocalHost(suggestedHost);
257                // do NOT set the default port, because we don't
258                // know if we can't listen YET...
259
260                // write out default endpoint to match protocol
261                // (but it serves no purpose)
262                TCPEndpoint localEp =
263                    TCPEndpoint.getLocalEndpoint(0, null, null);
264                out.writeUTF(localEp.getHost());
265                out.writeInt(localEp.getPort());
266                if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
267                    TCPTransport.tcpLog.log(Log.VERBOSE, "using " +
268                        localEp.getHost() + ":" + localEp.getPort());
269                }
270
271                /*
272                 * After JRMP handshake, set socket read timeout to value
273                 * configured for the rest of the lifetime of the
274                 * connection.  NOTE: this timeout, if configured to a
275                 * finite duration, places an upper bound on the time
276                 * that a remote method call is permitted to execute.
277                 */
278                try {
279                    /*
280                     * If socket factory had set a non-zero timeout on its
281                     * own, then restore it instead of using the property-
282                     * configured value.
283                     */
284                    sock.setSoTimeout((originalSoTimeout != 0 ?
285                                       originalSoTimeout :
286                                       responseTimeout));
287                } catch (Exception e) {
288                    // if we fail to set this, ignore and proceed anyway
289                }
290
291                out.flush();
292            }
293        } catch (IOException e) {
294            try {
295                conn.close();
296            } catch (Exception ex) {}
297            if (e instanceof RemoteException) {
298                throw (RemoteException) e;
299            } else {
300                throw new ConnectIOException(
301                    "error during JRMP connection establishment", e);
302            }
303        }
304        return conn;
305    }
306
307    /**
308     * Free the connection generated by this channel.
309     * @param conn The connection
310     * @param reuse If true, the connection is in a state in which it
311     *        can be reused for another method call.
312     */
313    public void free(Connection conn, boolean reuse) {
314        if (conn == null) return;
315
316        if (reuse && conn.isReusable()) {
317            long lastuse = System.currentTimeMillis();
318            TCPConnection tcpConnection = (TCPConnection) conn;
319
320            TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection");
321
322            /*
323             * Cache connection; if reaper task for expired
324             * connections isn't scheduled, then schedule it.
325             */
326            synchronized (freeList) {
327                freeList.add(tcpConnection);
328                if (reaper == null) {
329                    TCPTransport.tcpLog.log(Log.BRIEF, "create reaper");
330
331                    reaper = scheduler.scheduleWithFixedDelay(
332                        new Runnable() {
333                            public void run() {
334                                TCPTransport.tcpLog.log(Log.VERBOSE,
335                                                        "wake up");
336                                freeCachedConnections();
337                            }
338                        }, idleTimeout, idleTimeout, TimeUnit.MILLISECONDS);
339                }
340            }
341
342            tcpConnection.setLastUseTime(lastuse);
343            tcpConnection.setExpiration(lastuse + idleTimeout);
344        } else {
345            TCPTransport.tcpLog.log(Log.BRIEF, "close connection");
346
347            try {
348                conn.close();
349            } catch (IOException ignored) {
350            }
351        }
352    }
353
354    /**
355     * Send transport header over stream.
356     */
357    private void writeTransportHeader(DataOutputStream out)
358        throws RemoteException
359    {
360        try {
361            // write out transport header
362            DataOutputStream dataOut =
363                new DataOutputStream(out);
364            dataOut.writeInt(TransportConstants.Magic);
365            dataOut.writeShort(TransportConstants.Version);
366        } catch (IOException e) {
367            throw new ConnectIOException(
368                "error writing JRMP transport header", e);
369        }
370    }
371
372    /**
373     * Closes all the connections in the cache, whether timed out or not.
374     */
375    public void shedCache() {
376        // Build a list of connections, to avoid holding the freeList
377        // lock during (potentially long-running) close() calls.
378        Connection[] conn;
379        synchronized (freeList) {
380            conn = freeList.toArray(new Connection[freeList.size()]);
381            freeList.clear();
382        }
383
384        // Close all the connections that were free
385        for (int i = conn.length; --i >= 0; ) {
386            Connection c = conn[i];
387            conn[i] = null; // help gc
388            try {
389                c.close();
390            } catch (java.io.IOException e) {
391                // eat exception
392            }
393        }
394    }
395
396    private void freeCachedConnections() {
397        /*
398         * Remove each connection whose time out has expired.
399         */
400        synchronized (freeList) {
401            int size = freeList.size();
402
403            if (size > 0) {
404                long time = System.currentTimeMillis();
405                ListIterator<TCPConnection> iter = freeList.listIterator(size);
406
407                while (iter.hasPrevious()) {
408                    TCPConnection conn = iter.previous();
409                    if (conn.expired(time)) {
410                        TCPTransport.tcpLog.log(Log.VERBOSE,
411                            "connection timeout expired");
412
413                        try {
414                            conn.close();
415                        } catch (java.io.IOException e) {
416                            // eat exception
417                        }
418                        iter.remove();
419                    }
420                }
421            }
422
423            if (freeList.isEmpty()) {
424                reaper.cancel(false);
425                reaper = null;
426            }
427        }
428    }
429}
430
431/**
432 * ConnectionAcceptor manages accepting new connections and giving them
433 * to TCPTransport's message handler on new threads.
434 *
435 * Since this object only needs to know which transport to give new
436 * connections to, it doesn't need to be per-channel as currently
437 * implemented.
438 */
439class ConnectionAcceptor implements Runnable {
440
441    /** transport that will handle message on accepted connections */
442    private TCPTransport transport;
443
444    /** queue of connections to be accepted */
445    private List<Connection> queue = new ArrayList<>();
446
447    /** thread ID counter */
448    private static int threadNum = 0;
449
450    /**
451     * Create a new ConnectionAcceptor that will give connections
452     * to the specified transport on a new thread.
453     */
454    public ConnectionAcceptor(TCPTransport transport) {
455        this.transport = transport;
456    }
457
458    /**
459     * Start a new thread to accept connections.
460     */
461    public void startNewAcceptor() {
462        Thread t = AccessController.doPrivileged(
463            new NewThreadAction(ConnectionAcceptor.this,
464                                "TCPChannel Accept-" + ++ threadNum,
465                                true));
466        t.start();
467    }
468
469    /**
470     * Add connection to queue of connections to be accepted.
471     */
472    public void accept(Connection conn) {
473        synchronized (queue) {
474            queue.add(conn);
475            queue.notify();
476        }
477    }
478
479    /**
480     * Give transport next accepted connection, when available.
481     */
482    public void run() {
483        Connection conn;
484
485        synchronized (queue) {
486            while (queue.size() == 0) {
487                try {
488                    queue.wait();
489                } catch (InterruptedException e) {
490                }
491            }
492            startNewAcceptor();
493            conn = queue.remove(0);
494        }
495
496        transport.handleMessages(conn, true);
497    }
498}
499