1/* 2 * Copyright (c) 1996, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25package sun.rmi.transport.tcp; 26 27import java.io.DataInputStream; 28import java.io.DataOutputStream; 29import java.io.IOException; 30import java.lang.ref.Reference; 31import java.lang.ref.SoftReference; 32import java.net.Socket; 33import java.rmi.ConnectIOException; 34import java.rmi.RemoteException; 35import java.security.AccessControlContext; 36import java.security.AccessController; 37import java.security.PrivilegedAction; 38import java.util.ArrayList; 39import java.util.List; 40import java.util.ListIterator; 41import java.util.WeakHashMap; 42import java.util.concurrent.Future; 43import java.util.concurrent.ScheduledExecutorService; 44import java.util.concurrent.TimeUnit; 45import sun.rmi.runtime.Log; 46import sun.rmi.runtime.NewThreadAction; 47import sun.rmi.runtime.RuntimeUtil; 48import sun.rmi.transport.Channel; 49import sun.rmi.transport.Connection; 50import sun.rmi.transport.Endpoint; 51import sun.rmi.transport.TransportConstants; 52 53/** 54 * TCPChannel is the socket-based implementation of the RMI Channel 55 * abstraction. 56 * 57 * @author Ann Wollrath 58 */ 59public class TCPChannel implements Channel { 60 /** endpoint for this channel */ 61 private final TCPEndpoint ep; 62 /** transport for this channel */ 63 private final TCPTransport tr; 64 /** list of cached connections */ 65 private final List<TCPConnection> freeList = 66 new ArrayList<>(); 67 /** frees cached connections that have expired (guarded by freeList) */ 68 private Future<?> reaper = null; 69 70 /** connection acceptor (should be in TCPTransport) */ 71 private ConnectionAcceptor acceptor; 72 73 /** most recently authorized AccessControlContext */ 74 private AccessControlContext okContext; 75 76 /** cache of authorized AccessControlContexts */ 77 private WeakHashMap<AccessControlContext, 78 Reference<AccessControlContext>> authcache; 79 80 /** the SecurityManager which authorized okContext and authcache */ 81 private SecurityManager cacheSecurityManager = null; 82 83 /** client-side connection idle usage timeout */ 84 private static final long idleTimeout = // default 15 seconds 85 AccessController.doPrivileged((PrivilegedAction<Long>) () -> 86 Long.getLong("sun.rmi.transport.connectionTimeout", 15000)); 87 88 /** client-side connection handshake read timeout */ 89 private static final int handshakeTimeout = // default 1 minute 90 AccessController.doPrivileged((PrivilegedAction<Integer>) () -> 91 Integer.getInteger("sun.rmi.transport.tcp.handshakeTimeout", 60000)); 92 93 /** client-side connection response read timeout (after handshake) */ 94 private static final int responseTimeout = // default infinity 95 AccessController.doPrivileged((PrivilegedAction<Integer>) () -> 96 Integer.getInteger("sun.rmi.transport.tcp.responseTimeout", 0)); 97 98 /** thread pool for scheduling delayed tasks */ 99 private static final ScheduledExecutorService scheduler = 100 AccessController.doPrivileged( 101 new RuntimeUtil.GetInstanceAction()).getScheduler(); 102 103 /** 104 * Create channel for endpoint. 105 */ 106 TCPChannel(TCPTransport tr, TCPEndpoint ep) { 107 this.tr = tr; 108 this.ep = ep; 109 } 110 111 /** 112 * Return the endpoint for this channel. 113 */ 114 public Endpoint getEndpoint() { 115 return ep; 116 } 117 118 /** 119 * Checks if the current caller has sufficient privilege to make 120 * a connection to the remote endpoint. 121 * @exception SecurityException if caller is not allowed to use this 122 * Channel. 123 */ 124 private void checkConnectPermission() throws SecurityException { 125 SecurityManager security = System.getSecurityManager(); 126 if (security == null) 127 return; 128 129 if (security != cacheSecurityManager) { 130 // The security manager changed: flush the cache 131 okContext = null; 132 authcache = new WeakHashMap<AccessControlContext, 133 Reference<AccessControlContext>>(); 134 cacheSecurityManager = security; 135 } 136 137 AccessControlContext ctx = AccessController.getContext(); 138 139 // If ctx is the same context as last time, or if it 140 // appears in the cache, bypass the checkConnect. 141 if (okContext == null || 142 !(okContext.equals(ctx) || authcache.containsKey(ctx))) 143 { 144 security.checkConnect(ep.getHost(), ep.getPort()); 145 authcache.put(ctx, new SoftReference<AccessControlContext>(ctx)); 146 // A WeakHashMap is transformed into a SoftHashSet by making 147 // each value softly refer to its own key (Peter's idea). 148 } 149 okContext = ctx; 150 } 151 152 /** 153 * Supplies a connection to the endpoint of the address space 154 * for which this is a channel. The returned connection may 155 * be one retrieved from a cache of idle connections. 156 */ 157 public Connection newConnection() throws RemoteException { 158 TCPConnection conn; 159 160 // loop until we find a free live connection (in which case 161 // we return) or until we run out of freelist (in which case 162 // the loop exits) 163 do { 164 conn = null; 165 // try to get a free connection 166 synchronized (freeList) { 167 int elementPos = freeList.size()-1; 168 169 if (elementPos >= 0) { 170 // If there is a security manager, make sure 171 // the caller is allowed to connect to the 172 // requested endpoint. 173 checkConnectPermission(); 174 conn = freeList.get(elementPos); 175 freeList.remove(elementPos); 176 } 177 } 178 179 // at this point, conn is null iff the freelist is empty, 180 // and nonnull if a free connection of uncertain vitality 181 // has been found. 182 183 if (conn != null) { 184 // check to see if the connection has closed since last use 185 if (!conn.isDead()) { 186 TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection"); 187 return conn; 188 } 189 190 // conn is dead, and cannot be reused (reuse => false) 191 this.free(conn, false); 192 } 193 } while (conn != null); 194 195 // none free, so create a new connection 196 return (createConnection()); 197 } 198 199 /** 200 * Create a new connection to the remote endpoint of this channel. 201 * The returned connection is new. The caller must already have 202 * passed a security checkConnect or equivalent. 203 */ 204 private Connection createConnection() throws RemoteException { 205 Connection conn; 206 207 TCPTransport.tcpLog.log(Log.BRIEF, "create connection"); 208 209 Socket sock = ep.newSocket(); 210 conn = new TCPConnection(this, sock); 211 212 try { 213 DataOutputStream out = 214 new DataOutputStream(conn.getOutputStream()); 215 writeTransportHeader(out); 216 217 // choose protocol (single op if not reusable socket) 218 if (!conn.isReusable()) { 219 out.writeByte(TransportConstants.SingleOpProtocol); 220 } else { 221 out.writeByte(TransportConstants.StreamProtocol); 222 out.flush(); 223 224 /* 225 * Set socket read timeout to configured value for JRMP 226 * connection handshake; this also serves to guard against 227 * non-JRMP servers that do not respond (see 4322806). 228 */ 229 int originalSoTimeout = 0; 230 try { 231 originalSoTimeout = sock.getSoTimeout(); 232 sock.setSoTimeout(handshakeTimeout); 233 } catch (Exception e) { 234 // if we fail to set this, ignore and proceed anyway 235 } 236 237 DataInputStream in = 238 new DataInputStream(conn.getInputStream()); 239 byte ack = in.readByte(); 240 if (ack != TransportConstants.ProtocolAck) { 241 throw new ConnectIOException( 242 ack == TransportConstants.ProtocolNack ? 243 "JRMP StreamProtocol not supported by server" : 244 "non-JRMP server at remote endpoint"); 245 } 246 247 String suggestedHost = in.readUTF(); 248 int suggestedPort = in.readInt(); 249 if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) { 250 TCPTransport.tcpLog.log(Log.VERBOSE, 251 "server suggested " + suggestedHost + ":" + 252 suggestedPort); 253 } 254 255 // set local host name, if unknown 256 TCPEndpoint.setLocalHost(suggestedHost); 257 // do NOT set the default port, because we don't 258 // know if we can't listen YET... 259 260 // write out default endpoint to match protocol 261 // (but it serves no purpose) 262 TCPEndpoint localEp = 263 TCPEndpoint.getLocalEndpoint(0, null, null); 264 out.writeUTF(localEp.getHost()); 265 out.writeInt(localEp.getPort()); 266 if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) { 267 TCPTransport.tcpLog.log(Log.VERBOSE, "using " + 268 localEp.getHost() + ":" + localEp.getPort()); 269 } 270 271 /* 272 * After JRMP handshake, set socket read timeout to value 273 * configured for the rest of the lifetime of the 274 * connection. NOTE: this timeout, if configured to a 275 * finite duration, places an upper bound on the time 276 * that a remote method call is permitted to execute. 277 */ 278 try { 279 /* 280 * If socket factory had set a non-zero timeout on its 281 * own, then restore it instead of using the property- 282 * configured value. 283 */ 284 sock.setSoTimeout((originalSoTimeout != 0 ? 285 originalSoTimeout : 286 responseTimeout)); 287 } catch (Exception e) { 288 // if we fail to set this, ignore and proceed anyway 289 } 290 291 out.flush(); 292 } 293 } catch (IOException e) { 294 try { 295 conn.close(); 296 } catch (Exception ex) {} 297 if (e instanceof RemoteException) { 298 throw (RemoteException) e; 299 } else { 300 throw new ConnectIOException( 301 "error during JRMP connection establishment", e); 302 } 303 } 304 return conn; 305 } 306 307 /** 308 * Free the connection generated by this channel. 309 * @param conn The connection 310 * @param reuse If true, the connection is in a state in which it 311 * can be reused for another method call. 312 */ 313 public void free(Connection conn, boolean reuse) { 314 if (conn == null) return; 315 316 if (reuse && conn.isReusable()) { 317 long lastuse = System.currentTimeMillis(); 318 TCPConnection tcpConnection = (TCPConnection) conn; 319 320 TCPTransport.tcpLog.log(Log.BRIEF, "reuse connection"); 321 322 /* 323 * Cache connection; if reaper task for expired 324 * connections isn't scheduled, then schedule it. 325 */ 326 synchronized (freeList) { 327 freeList.add(tcpConnection); 328 if (reaper == null) { 329 TCPTransport.tcpLog.log(Log.BRIEF, "create reaper"); 330 331 reaper = scheduler.scheduleWithFixedDelay( 332 new Runnable() { 333 public void run() { 334 TCPTransport.tcpLog.log(Log.VERBOSE, 335 "wake up"); 336 freeCachedConnections(); 337 } 338 }, idleTimeout, idleTimeout, TimeUnit.MILLISECONDS); 339 } 340 } 341 342 tcpConnection.setLastUseTime(lastuse); 343 tcpConnection.setExpiration(lastuse + idleTimeout); 344 } else { 345 TCPTransport.tcpLog.log(Log.BRIEF, "close connection"); 346 347 try { 348 conn.close(); 349 } catch (IOException ignored) { 350 } 351 } 352 } 353 354 /** 355 * Send transport header over stream. 356 */ 357 private void writeTransportHeader(DataOutputStream out) 358 throws RemoteException 359 { 360 try { 361 // write out transport header 362 DataOutputStream dataOut = 363 new DataOutputStream(out); 364 dataOut.writeInt(TransportConstants.Magic); 365 dataOut.writeShort(TransportConstants.Version); 366 } catch (IOException e) { 367 throw new ConnectIOException( 368 "error writing JRMP transport header", e); 369 } 370 } 371 372 /** 373 * Closes all the connections in the cache, whether timed out or not. 374 */ 375 public void shedCache() { 376 // Build a list of connections, to avoid holding the freeList 377 // lock during (potentially long-running) close() calls. 378 Connection[] conn; 379 synchronized (freeList) { 380 conn = freeList.toArray(new Connection[freeList.size()]); 381 freeList.clear(); 382 } 383 384 // Close all the connections that were free 385 for (int i = conn.length; --i >= 0; ) { 386 Connection c = conn[i]; 387 conn[i] = null; // help gc 388 try { 389 c.close(); 390 } catch (java.io.IOException e) { 391 // eat exception 392 } 393 } 394 } 395 396 private void freeCachedConnections() { 397 /* 398 * Remove each connection whose time out has expired. 399 */ 400 synchronized (freeList) { 401 int size = freeList.size(); 402 403 if (size > 0) { 404 long time = System.currentTimeMillis(); 405 ListIterator<TCPConnection> iter = freeList.listIterator(size); 406 407 while (iter.hasPrevious()) { 408 TCPConnection conn = iter.previous(); 409 if (conn.expired(time)) { 410 TCPTransport.tcpLog.log(Log.VERBOSE, 411 "connection timeout expired"); 412 413 try { 414 conn.close(); 415 } catch (java.io.IOException e) { 416 // eat exception 417 } 418 iter.remove(); 419 } 420 } 421 } 422 423 if (freeList.isEmpty()) { 424 reaper.cancel(false); 425 reaper = null; 426 } 427 } 428 } 429} 430 431/** 432 * ConnectionAcceptor manages accepting new connections and giving them 433 * to TCPTransport's message handler on new threads. 434 * 435 * Since this object only needs to know which transport to give new 436 * connections to, it doesn't need to be per-channel as currently 437 * implemented. 438 */ 439class ConnectionAcceptor implements Runnable { 440 441 /** transport that will handle message on accepted connections */ 442 private TCPTransport transport; 443 444 /** queue of connections to be accepted */ 445 private List<Connection> queue = new ArrayList<>(); 446 447 /** thread ID counter */ 448 private static int threadNum = 0; 449 450 /** 451 * Create a new ConnectionAcceptor that will give connections 452 * to the specified transport on a new thread. 453 */ 454 public ConnectionAcceptor(TCPTransport transport) { 455 this.transport = transport; 456 } 457 458 /** 459 * Start a new thread to accept connections. 460 */ 461 public void startNewAcceptor() { 462 Thread t = AccessController.doPrivileged( 463 new NewThreadAction(ConnectionAcceptor.this, 464 "TCPChannel Accept-" + ++ threadNum, 465 true)); 466 t.start(); 467 } 468 469 /** 470 * Add connection to queue of connections to be accepted. 471 */ 472 public void accept(Connection conn) { 473 synchronized (queue) { 474 queue.add(conn); 475 queue.notify(); 476 } 477 } 478 479 /** 480 * Give transport next accepted connection, when available. 481 */ 482 public void run() { 483 Connection conn; 484 485 synchronized (queue) { 486 while (queue.size() == 0) { 487 try { 488 queue.wait(); 489 } catch (InterruptedException e) { 490 } 491 } 492 startNewAcceptor(); 493 conn = queue.remove(0); 494 } 495 496 transport.handleMessages(conn, true); 497 } 498} 499