1/* 2 * Copyright (c) 1996, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25package sun.rmi.transport.tcp; 26 27import java.lang.ref.Reference; 28import java.lang.ref.SoftReference; 29import java.lang.ref.WeakReference; 30import java.lang.reflect.InvocationTargetException; 31import java.lang.reflect.UndeclaredThrowableException; 32import java.io.DataInputStream; 33import java.io.DataOutputStream; 34import java.io.IOException; 35import java.io.InputStream; 36import java.io.OutputStream; 37import java.io.BufferedInputStream; 38import java.io.BufferedOutputStream; 39import java.net.InetAddress; 40import java.net.ServerSocket; 41import java.net.Socket; 42import java.rmi.RemoteException; 43import java.rmi.server.ExportException; 44import java.rmi.server.LogStream; 45import java.rmi.server.RMIFailureHandler; 46import java.rmi.server.RMISocketFactory; 47import java.rmi.server.RemoteCall; 48import java.rmi.server.ServerNotActiveException; 49import java.rmi.server.UID; 50import java.security.AccessControlContext; 51import java.security.AccessController; 52import java.security.Permissions; 53import java.security.PrivilegedAction; 54import java.security.ProtectionDomain; 55import java.util.ArrayList; 56import java.util.LinkedList; 57import java.util.List; 58import java.util.Map; 59import java.util.WeakHashMap; 60import java.util.logging.Level; 61import java.util.concurrent.ExecutorService; 62import java.util.concurrent.RejectedExecutionException; 63import java.util.concurrent.SynchronousQueue; 64import java.util.concurrent.ThreadFactory; 65import java.util.concurrent.ThreadPoolExecutor; 66import java.util.concurrent.TimeUnit; 67import java.util.concurrent.atomic.AtomicInteger; 68import sun.rmi.runtime.Log; 69import sun.rmi.runtime.NewThreadAction; 70import sun.rmi.transport.Channel; 71import sun.rmi.transport.Connection; 72import sun.rmi.transport.DGCAckHandler; 73import sun.rmi.transport.Endpoint; 74import sun.rmi.transport.StreamRemoteCall; 75import sun.rmi.transport.Target; 76import sun.rmi.transport.Transport; 77import sun.rmi.transport.TransportConstants; 78 79/** 80 * TCPTransport is the socket-based implementation of the RMI Transport 81 * abstraction. 82 * 83 * @author Ann Wollrath 84 * @author Peter Jones 85 */ 86@SuppressWarnings("deprecation") 87public class TCPTransport extends Transport { 88 89 /* tcp package log */ 90 static final Log tcpLog = Log.getLog("sun.rmi.transport.tcp", "tcp", 91 LogStream.parseLevel(AccessController.doPrivileged( 92 (PrivilegedAction<String>) () -> System.getProperty("sun.rmi.transport.tcp.logLevel")))); 93 94 /** maximum number of connection handler threads */ 95 private static final int maxConnectionThreads = // default no limit 96 AccessController.doPrivileged((PrivilegedAction<Integer>) () -> 97 Integer.getInteger("sun.rmi.transport.tcp.maxConnectionThreads", 98 Integer.MAX_VALUE)); 99 100 /** keep alive time for idle connection handler threads */ 101 private static final long threadKeepAliveTime = // default 1 minute 102 AccessController.doPrivileged((PrivilegedAction<Long>) () -> 103 Long.getLong("sun.rmi.transport.tcp.threadKeepAliveTime", 60000)); 104 105 /** enable multiplexing protocol */ 106 private static final boolean enableMultiplexProtocol = // default false 107 AccessController.doPrivileged((PrivilegedAction<Boolean>) () -> 108 Boolean.getBoolean("sun.rmi.transport.tcp.enableMultiplexProtocol")); 109 110 /** thread pool for connection handlers */ 111 private static final ExecutorService connectionThreadPool = 112 new ThreadPoolExecutor(0, maxConnectionThreads, 113 threadKeepAliveTime, TimeUnit.MILLISECONDS, 114 new SynchronousQueue<Runnable>(), 115 new ThreadFactory() { 116 public Thread newThread(Runnable runnable) { 117 return AccessController.doPrivileged(new NewThreadAction( 118 runnable, "TCP Connection(idle)", true, true)); 119 } 120 }); 121 122 /** total connections handled */ 123 private static final AtomicInteger connectionCount = new AtomicInteger(0); 124 125 /** client host for the current thread's connection */ 126 private static final ThreadLocal<ConnectionHandler> 127 threadConnectionHandler = new ThreadLocal<>(); 128 129 /** an AccessControlContext with no permissions */ 130 private static final AccessControlContext NOPERMS_ACC; 131 static { 132 Permissions perms = new Permissions(); 133 ProtectionDomain[] pd = { new ProtectionDomain(null, perms) }; 134 NOPERMS_ACC = new AccessControlContext(pd); 135 } 136 137 /** endpoints for this transport */ 138 private final LinkedList<TCPEndpoint> epList; 139 /** number of objects exported on this transport */ 140 private int exportCount = 0; 141 /** server socket for this transport */ 142 private ServerSocket server = null; 143 /** table mapping endpoints to channels */ 144 private final Map<TCPEndpoint,Reference<TCPChannel>> channelTable = 145 new WeakHashMap<>(); 146 147 static final RMISocketFactory defaultSocketFactory = 148 RMISocketFactory.getDefaultSocketFactory(); 149 150 /** number of milliseconds in accepted-connection timeout. 151 * Warning: this should be greater than 15 seconds (the client-side 152 * timeout), and defaults to 2 hours. 153 * The maximum representable value is slightly more than 24 days 154 * and 20 hours. 155 */ 156 private static final int connectionReadTimeout = // default 2 hours 157 AccessController.doPrivileged((PrivilegedAction<Integer>) () -> 158 Integer.getInteger("sun.rmi.transport.tcp.readTimeout", 2 * 3600 * 1000)); 159 160 /** 161 * Constructs a TCPTransport. 162 */ 163 TCPTransport(LinkedList<TCPEndpoint> epList) { 164 // assert ((epList.size() != null) && (epList.size() >= 1)) 165 this.epList = epList; 166 if (tcpLog.isLoggable(Log.BRIEF)) { 167 tcpLog.log(Log.BRIEF, "Version = " + 168 TransportConstants.Version + ", ep = " + getEndpoint()); 169 } 170 } 171 172 /** 173 * Closes all cached connections in every channel subordinated to this 174 * transport. Currently, this only closes outgoing connections. 175 */ 176 public void shedConnectionCaches() { 177 List<TCPChannel> channels; 178 synchronized (channelTable) { 179 channels = new ArrayList<TCPChannel>(channelTable.values().size()); 180 for (Reference<TCPChannel> ref : channelTable.values()) { 181 TCPChannel ch = ref.get(); 182 if (ch != null) { 183 channels.add(ch); 184 } 185 } 186 } 187 for (TCPChannel channel : channels) { 188 channel.shedCache(); 189 } 190 } 191 192 /** 193 * Returns a <I>Channel</I> that generates connections to the 194 * endpoint <I>ep</I>. A Channel is an object that creates and 195 * manages connections of a particular type to some particular 196 * address space. 197 * @param ep the endpoint to which connections will be generated. 198 * @return the channel or null if the transport cannot 199 * generate connections to this endpoint 200 */ 201 public TCPChannel getChannel(Endpoint ep) { 202 TCPChannel ch = null; 203 if (ep instanceof TCPEndpoint) { 204 synchronized (channelTable) { 205 Reference<TCPChannel> ref = channelTable.get(ep); 206 if (ref != null) { 207 ch = ref.get(); 208 } 209 if (ch == null) { 210 TCPEndpoint tcpEndpoint = (TCPEndpoint) ep; 211 ch = new TCPChannel(this, tcpEndpoint); 212 channelTable.put(tcpEndpoint, 213 new WeakReference<TCPChannel>(ch)); 214 } 215 } 216 } 217 return ch; 218 } 219 220 /** 221 * Removes the <I>Channel</I> that generates connections to the 222 * endpoint <I>ep</I>. 223 */ 224 public void free(Endpoint ep) { 225 if (ep instanceof TCPEndpoint) { 226 synchronized (channelTable) { 227 Reference<TCPChannel> ref = channelTable.remove(ep); 228 if (ref != null) { 229 TCPChannel channel = ref.get(); 230 if (channel != null) { 231 channel.shedCache(); 232 } 233 } 234 } 235 } 236 } 237 238 /** 239 * Export the object so that it can accept incoming calls. 240 */ 241 public void exportObject(Target target) throws RemoteException { 242 /* 243 * Ensure that a server socket is listening, and count this 244 * export while synchronized to prevent the server socket from 245 * being closed due to concurrent unexports. 246 */ 247 synchronized (this) { 248 listen(); 249 exportCount++; 250 } 251 252 /* 253 * Try to add the Target to the exported object table; keep 254 * counting this export (to keep server socket open) only if 255 * that succeeds. 256 */ 257 boolean ok = false; 258 try { 259 super.exportObject(target); 260 ok = true; 261 } finally { 262 if (!ok) { 263 synchronized (this) { 264 decrementExportCount(); 265 } 266 } 267 } 268 } 269 270 protected synchronized void targetUnexported() { 271 decrementExportCount(); 272 } 273 274 /** 275 * Decrements the count of exported objects, closing the current 276 * server socket if the count reaches zero. 277 **/ 278 private void decrementExportCount() { 279 assert Thread.holdsLock(this); 280 exportCount--; 281 if (exportCount == 0 && getEndpoint().getListenPort() != 0) { 282 ServerSocket ss = server; 283 server = null; 284 try { 285 ss.close(); 286 } catch (IOException e) { 287 } 288 } 289 } 290 291 /** 292 * Verify that the current access control context has permission to 293 * accept the connection being dispatched by the current thread. 294 */ 295 protected void checkAcceptPermission(AccessControlContext acc) { 296 SecurityManager sm = System.getSecurityManager(); 297 if (sm == null) { 298 return; 299 } 300 ConnectionHandler h = threadConnectionHandler.get(); 301 if (h == null) { 302 throw new Error( 303 "checkAcceptPermission not in ConnectionHandler thread"); 304 } 305 h.checkAcceptPermission(sm, acc); 306 } 307 308 private TCPEndpoint getEndpoint() { 309 synchronized (epList) { 310 return epList.getLast(); 311 } 312 } 313 314 /** 315 * Listen on transport's endpoint. 316 */ 317 private void listen() throws RemoteException { 318 assert Thread.holdsLock(this); 319 TCPEndpoint ep = getEndpoint(); 320 int port = ep.getPort(); 321 322 if (server == null) { 323 if (tcpLog.isLoggable(Log.BRIEF)) { 324 tcpLog.log(Log.BRIEF, 325 "(port " + port + ") create server socket"); 326 } 327 328 try { 329 server = ep.newServerSocket(); 330 /* 331 * Don't retry ServerSocket if creation fails since 332 * "port in use" will cause export to hang if an 333 * RMIFailureHandler is not installed. 334 */ 335 Thread t = AccessController.doPrivileged( 336 new NewThreadAction(new AcceptLoop(server), 337 "TCP Accept-" + port, true)); 338 t.start(); 339 } catch (java.net.BindException e) { 340 throw new ExportException("Port already in use: " + port, e); 341 } catch (IOException e) { 342 throw new ExportException("Listen failed on port: " + port, e); 343 } 344 345 } else { 346 // otherwise verify security access to existing server socket 347 SecurityManager sm = System.getSecurityManager(); 348 if (sm != null) { 349 sm.checkListen(port); 350 } 351 } 352 } 353 354 /** 355 * Worker for accepting connections from a server socket. 356 **/ 357 private class AcceptLoop implements Runnable { 358 359 private final ServerSocket serverSocket; 360 361 // state for throttling loop on exceptions (local to accept thread) 362 private long lastExceptionTime = 0L; 363 private int recentExceptionCount; 364 365 AcceptLoop(ServerSocket serverSocket) { 366 this.serverSocket = serverSocket; 367 } 368 369 public void run() { 370 try { 371 executeAcceptLoop(); 372 } finally { 373 try { 374 /* 375 * Only one accept loop is started per server 376 * socket, so after no more connections will be 377 * accepted, ensure that the server socket is no 378 * longer listening. 379 */ 380 serverSocket.close(); 381 } catch (IOException e) { 382 } 383 } 384 } 385 386 /** 387 * Accepts connections from the server socket and executes 388 * handlers for them in the thread pool. 389 **/ 390 private void executeAcceptLoop() { 391 if (tcpLog.isLoggable(Log.BRIEF)) { 392 tcpLog.log(Log.BRIEF, "listening on port " + 393 getEndpoint().getPort()); 394 } 395 396 while (true) { 397 Socket socket = null; 398 try { 399 socket = serverSocket.accept(); 400 401 /* 402 * Find client host name (or "0.0.0.0" if unknown) 403 */ 404 InetAddress clientAddr = socket.getInetAddress(); 405 String clientHost = (clientAddr != null 406 ? clientAddr.getHostAddress() 407 : "0.0.0.0"); 408 409 /* 410 * Execute connection handler in the thread pool, 411 * which uses non-system threads. 412 */ 413 try { 414 connectionThreadPool.execute( 415 new ConnectionHandler(socket, clientHost)); 416 } catch (RejectedExecutionException e) { 417 closeSocket(socket); 418 tcpLog.log(Log.BRIEF, 419 "rejected connection from " + clientHost); 420 } 421 422 } catch (Throwable t) { 423 try { 424 /* 425 * If the server socket has been closed, such 426 * as because there are no more exported 427 * objects, then we expect accept to throw an 428 * exception, so just terminate normally. 429 */ 430 if (serverSocket.isClosed()) { 431 break; 432 } 433 434 try { 435 if (tcpLog.isLoggable(Level.WARNING)) { 436 tcpLog.log(Level.WARNING, 437 "accept loop for " + serverSocket + 438 " throws", t); 439 } 440 } catch (Throwable tt) { 441 } 442 } finally { 443 /* 444 * Always close the accepted socket (if any) 445 * if an exception occurs, but only after 446 * logging an unexpected exception. 447 */ 448 if (socket != null) { 449 closeSocket(socket); 450 } 451 } 452 453 /* 454 * In case we're running out of file descriptors, 455 * release resources held in caches. 456 */ 457 if (!(t instanceof SecurityException)) { 458 try { 459 TCPEndpoint.shedConnectionCaches(); 460 } catch (Throwable tt) { 461 } 462 } 463 464 /* 465 * A NoClassDefFoundError can occur if no file 466 * descriptors are available, in which case this 467 * loop should not terminate. 468 */ 469 if (t instanceof Exception || 470 t instanceof OutOfMemoryError || 471 t instanceof NoClassDefFoundError) 472 { 473 if (!continueAfterAcceptFailure(t)) { 474 return; 475 } 476 // continue loop 477 } else if (t instanceof Error) { 478 throw (Error) t; 479 } else { 480 throw new UndeclaredThrowableException(t); 481 } 482 } 483 } 484 } 485 486 /** 487 * Returns true if the accept loop should continue after the 488 * specified exception has been caught, or false if the accept 489 * loop should terminate (closing the server socket). If 490 * there is an RMIFailureHandler, this method returns the 491 * result of passing the specified exception to it; otherwise, 492 * this method always returns true, after sleeping to throttle 493 * the accept loop if necessary. 494 **/ 495 private boolean continueAfterAcceptFailure(Throwable t) { 496 RMIFailureHandler fh = RMISocketFactory.getFailureHandler(); 497 if (fh != null) { 498 return fh.failure(t instanceof Exception ? (Exception) t : 499 new InvocationTargetException(t)); 500 } else { 501 throttleLoopOnException(); 502 return true; 503 } 504 } 505 506 /** 507 * Throttles the accept loop after an exception has been 508 * caught: if a burst of 10 exceptions in 5 seconds occurs, 509 * then wait for 10 seconds to curb busy CPU usage. 510 **/ 511 private void throttleLoopOnException() { 512 long now = System.currentTimeMillis(); 513 if (lastExceptionTime == 0L || (now - lastExceptionTime) > 5000) { 514 // last exception was long ago (or this is the first) 515 lastExceptionTime = now; 516 recentExceptionCount = 0; 517 } else { 518 // exception burst window was started recently 519 if (++recentExceptionCount >= 10) { 520 try { 521 Thread.sleep(10000); 522 } catch (InterruptedException ignore) { 523 } 524 } 525 } 526 } 527 } 528 529 /** close socket and eat exception */ 530 private static void closeSocket(Socket sock) { 531 try { 532 sock.close(); 533 } catch (IOException ex) { 534 // eat exception 535 } 536 } 537 538 /** 539 * handleMessages decodes transport operations and handles messages 540 * appropriately. If an exception occurs during message handling, 541 * the socket is closed. 542 */ 543 void handleMessages(Connection conn, boolean persistent) { 544 int port = getEndpoint().getPort(); 545 546 try { 547 DataInputStream in = new DataInputStream(conn.getInputStream()); 548 do { 549 int op = in.read(); // transport op 550 if (op == -1) { 551 if (tcpLog.isLoggable(Log.BRIEF)) { 552 tcpLog.log(Log.BRIEF, "(port " + 553 port + ") connection closed"); 554 } 555 break; 556 } 557 558 if (tcpLog.isLoggable(Log.BRIEF)) { 559 tcpLog.log(Log.BRIEF, "(port " + port + 560 ") op = " + op); 561 } 562 563 switch (op) { 564 case TransportConstants.Call: 565 // service incoming RMI call 566 RemoteCall call = new StreamRemoteCall(conn); 567 if (serviceCall(call) == false) 568 return; 569 break; 570 571 case TransportConstants.Ping: 572 // send ack for ping 573 DataOutputStream out = 574 new DataOutputStream(conn.getOutputStream()); 575 out.writeByte(TransportConstants.PingAck); 576 conn.releaseOutputStream(); 577 break; 578 579 case TransportConstants.DGCAck: 580 DGCAckHandler.received(UID.read(in)); 581 break; 582 583 default: 584 throw new IOException("unknown transport op " + op); 585 } 586 } while (persistent); 587 588 } catch (IOException e) { 589 // exception during processing causes connection to close (below) 590 if (tcpLog.isLoggable(Log.BRIEF)) { 591 tcpLog.log(Log.BRIEF, "(port " + port + 592 ") exception: ", e); 593 } 594 } finally { 595 try { 596 conn.close(); 597 } catch (IOException ex) { 598 // eat exception 599 } 600 } 601 } 602 603 /** 604 * Returns the client host for the current thread's connection. Throws 605 * ServerNotActiveException if no connection is active for this thread. 606 */ 607 public static String getClientHost() throws ServerNotActiveException { 608 ConnectionHandler h = threadConnectionHandler.get(); 609 if (h != null) { 610 return h.getClientHost(); 611 } else { 612 throw new ServerNotActiveException("not in a remote call"); 613 } 614 } 615 616 /** 617 * Services messages on accepted connection 618 */ 619 private class ConnectionHandler implements Runnable { 620 621 /** int value of "POST" in ASCII (Java's specified data formats 622 * make this once-reviled tactic again socially acceptable) */ 623 private static final int POST = 0x504f5354; 624 625 /** most recently accept-authorized AccessControlContext */ 626 private AccessControlContext okContext; 627 /** cache of accept-authorized AccessControlContexts */ 628 private Map<AccessControlContext, 629 Reference<AccessControlContext>> authCache; 630 /** security manager which authorized contexts in authCache */ 631 private SecurityManager cacheSecurityManager = null; 632 633 private Socket socket; 634 private String remoteHost; 635 636 ConnectionHandler(Socket socket, String remoteHost) { 637 this.socket = socket; 638 this.remoteHost = remoteHost; 639 } 640 641 String getClientHost() { 642 return remoteHost; 643 } 644 645 /** 646 * Verify that the given AccessControlContext has permission to 647 * accept this connection. 648 */ 649 void checkAcceptPermission(SecurityManager sm, 650 AccessControlContext acc) 651 { 652 /* 653 * Note: no need to synchronize on cache-related fields, since this 654 * method only gets called from the ConnectionHandler's thread. 655 */ 656 if (sm != cacheSecurityManager) { 657 okContext = null; 658 authCache = new WeakHashMap<AccessControlContext, 659 Reference<AccessControlContext>>(); 660 cacheSecurityManager = sm; 661 } 662 if (acc.equals(okContext) || authCache.containsKey(acc)) { 663 return; 664 } 665 InetAddress addr = socket.getInetAddress(); 666 String host = (addr != null) ? addr.getHostAddress() : "*"; 667 668 sm.checkAccept(host, socket.getPort()); 669 670 authCache.put(acc, new SoftReference<AccessControlContext>(acc)); 671 okContext = acc; 672 } 673 674 public void run() { 675 Thread t = Thread.currentThread(); 676 String name = t.getName(); 677 try { 678 t.setName("RMI TCP Connection(" + 679 connectionCount.incrementAndGet() + 680 ")-" + remoteHost); 681 AccessController.doPrivileged((PrivilegedAction<Void>)() -> { 682 run0(); 683 return null; 684 }, NOPERMS_ACC); 685 } finally { 686 t.setName(name); 687 } 688 } 689 690 private void run0() { 691 TCPEndpoint endpoint = getEndpoint(); 692 int port = endpoint.getPort(); 693 694 threadConnectionHandler.set(this); 695 696 // set socket to disable Nagle's algorithm (always send 697 // immediately) 698 // TBD: should this be left up to socket factory instead? 699 try { 700 socket.setTcpNoDelay(true); 701 } catch (Exception e) { 702 // if we fail to set this, ignore and proceed anyway 703 } 704 // set socket to timeout after excessive idle time 705 try { 706 if (connectionReadTimeout > 0) 707 socket.setSoTimeout(connectionReadTimeout); 708 } catch (Exception e) { 709 // too bad, continue anyway 710 } 711 712 try { 713 InputStream sockIn = socket.getInputStream(); 714 InputStream bufIn = sockIn.markSupported() 715 ? sockIn 716 : new BufferedInputStream(sockIn); 717 718 // Read magic 719 DataInputStream in = new DataInputStream(bufIn); 720 int magic = in.readInt(); 721 722 // read and verify transport header 723 short version = in.readShort(); 724 if (magic != TransportConstants.Magic || 725 version != TransportConstants.Version) { 726 // protocol mismatch detected... 727 // just close socket: this would recurse if we marshal an 728 // exception to the client and the protocol at other end 729 // doesn't match. 730 closeSocket(socket); 731 return; 732 } 733 734 OutputStream sockOut = socket.getOutputStream(); 735 BufferedOutputStream bufOut = 736 new BufferedOutputStream(sockOut); 737 DataOutputStream out = new DataOutputStream(bufOut); 738 739 int remotePort = socket.getPort(); 740 741 if (tcpLog.isLoggable(Log.BRIEF)) { 742 tcpLog.log(Log.BRIEF, "accepted socket from [" + 743 remoteHost + ":" + remotePort + "]"); 744 } 745 746 TCPEndpoint ep; 747 TCPChannel ch; 748 TCPConnection conn; 749 750 // send ack (or nack) for protocol 751 byte protocol = in.readByte(); 752 switch (protocol) { 753 case TransportConstants.SingleOpProtocol: 754 // no ack for protocol 755 756 // create dummy channel for receiving messages 757 ep = new TCPEndpoint(remoteHost, socket.getLocalPort(), 758 endpoint.getClientSocketFactory(), 759 endpoint.getServerSocketFactory()); 760 ch = new TCPChannel(TCPTransport.this, ep); 761 conn = new TCPConnection(ch, socket, bufIn, bufOut); 762 763 // read input messages 764 handleMessages(conn, false); 765 break; 766 767 case TransportConstants.StreamProtocol: 768 // send ack 769 out.writeByte(TransportConstants.ProtocolAck); 770 771 // suggest endpoint (in case client doesn't know host name) 772 if (tcpLog.isLoggable(Log.VERBOSE)) { 773 tcpLog.log(Log.VERBOSE, "(port " + port + 774 ") " + "suggesting " + remoteHost + ":" + 775 remotePort); 776 } 777 778 out.writeUTF(remoteHost); 779 out.writeInt(remotePort); 780 out.flush(); 781 782 // read and discard (possibly bogus) endpoint 783 // REMIND: would be faster to read 2 bytes then skip N+4 784 String clientHost = in.readUTF(); 785 int clientPort = in.readInt(); 786 if (tcpLog.isLoggable(Log.VERBOSE)) { 787 tcpLog.log(Log.VERBOSE, "(port " + port + 788 ") client using " + clientHost + ":" + clientPort); 789 } 790 791 // create dummy channel for receiving messages 792 // (why not use clientHost and clientPort?) 793 ep = new TCPEndpoint(remoteHost, socket.getLocalPort(), 794 endpoint.getClientSocketFactory(), 795 endpoint.getServerSocketFactory()); 796 ch = new TCPChannel(TCPTransport.this, ep); 797 conn = new TCPConnection(ch, socket, bufIn, bufOut); 798 799 // read input messages 800 handleMessages(conn, true); 801 break; 802 803 case TransportConstants.MultiplexProtocol: 804 805 if (!enableMultiplexProtocol) { 806 if (tcpLog.isLoggable(Log.VERBOSE)) { 807 tcpLog.log(Log.VERBOSE, "(port " + port + 808 ") rejecting multiplex protocol"); 809 } 810 811 // If MultiplexProtocol is disabled, send NACK immediately. 812 out.writeByte(TransportConstants.ProtocolNack); 813 out.flush(); 814 break; 815 } 816 817 if (tcpLog.isLoggable(Log.VERBOSE)) { 818 tcpLog.log(Log.VERBOSE, "(port " + port + 819 ") accepting multiplex protocol"); 820 } 821 822 // send ack 823 out.writeByte(TransportConstants.ProtocolAck); 824 825 // suggest endpoint (in case client doesn't already have one) 826 if (tcpLog.isLoggable(Log.VERBOSE)) { 827 tcpLog.log(Log.VERBOSE, "(port " + port + 828 ") suggesting " + remoteHost + ":" + remotePort); 829 } 830 831 out.writeUTF(remoteHost); 832 out.writeInt(remotePort); 833 out.flush(); 834 835 // read endpoint client has decided to use 836 ep = new TCPEndpoint(in.readUTF(), in.readInt(), 837 endpoint.getClientSocketFactory(), 838 endpoint.getServerSocketFactory()); 839 if (tcpLog.isLoggable(Log.VERBOSE)) { 840 tcpLog.log(Log.VERBOSE, "(port " + 841 port + ") client using " + 842 ep.getHost() + ":" + ep.getPort()); 843 } 844 845 ConnectionMultiplexer multiplexer; 846 synchronized (channelTable) { 847 // create or find channel for this endpoint 848 ch = getChannel(ep); 849 multiplexer = 850 new ConnectionMultiplexer(ch, bufIn, sockOut, 851 false); 852 ch.useMultiplexer(multiplexer); 853 } 854 multiplexer.run(); 855 break; 856 857 default: 858 // protocol not understood, send nack and close socket 859 out.writeByte(TransportConstants.ProtocolNack); 860 out.flush(); 861 break; 862 } 863 864 } catch (IOException e) { 865 // socket in unknown state: destroy socket 866 tcpLog.log(Log.BRIEF, "terminated with exception:", e); 867 } finally { 868 closeSocket(socket); 869 } 870 } 871 } 872} 873