1/*
2 * Copyright (c) 1996, 2016, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package sun.rmi.transport.tcp;
26
27import java.lang.ref.Reference;
28import java.lang.ref.SoftReference;
29import java.lang.ref.WeakReference;
30import java.lang.reflect.InvocationTargetException;
31import java.lang.reflect.UndeclaredThrowableException;
32import java.io.DataInputStream;
33import java.io.DataOutputStream;
34import java.io.IOException;
35import java.io.InputStream;
36import java.io.OutputStream;
37import java.io.BufferedInputStream;
38import java.io.BufferedOutputStream;
39import java.net.InetAddress;
40import java.net.ServerSocket;
41import java.net.Socket;
42import java.rmi.RemoteException;
43import java.rmi.server.ExportException;
44import java.rmi.server.LogStream;
45import java.rmi.server.RMIFailureHandler;
46import java.rmi.server.RMISocketFactory;
47import java.rmi.server.RemoteCall;
48import java.rmi.server.ServerNotActiveException;
49import java.rmi.server.UID;
50import java.security.AccessControlContext;
51import java.security.AccessController;
52import java.security.Permissions;
53import java.security.PrivilegedAction;
54import java.security.ProtectionDomain;
55import java.util.ArrayList;
56import java.util.LinkedList;
57import java.util.List;
58import java.util.Map;
59import java.util.WeakHashMap;
60import java.util.logging.Level;
61import java.util.concurrent.ExecutorService;
62import java.util.concurrent.RejectedExecutionException;
63import java.util.concurrent.SynchronousQueue;
64import java.util.concurrent.ThreadFactory;
65import java.util.concurrent.ThreadPoolExecutor;
66import java.util.concurrent.TimeUnit;
67import java.util.concurrent.atomic.AtomicInteger;
68import sun.rmi.runtime.Log;
69import sun.rmi.runtime.NewThreadAction;
70import sun.rmi.transport.Channel;
71import sun.rmi.transport.Connection;
72import sun.rmi.transport.DGCAckHandler;
73import sun.rmi.transport.Endpoint;
74import sun.rmi.transport.StreamRemoteCall;
75import sun.rmi.transport.Target;
76import sun.rmi.transport.Transport;
77import sun.rmi.transport.TransportConstants;
78
79/**
80 * TCPTransport is the socket-based implementation of the RMI Transport
81 * abstraction.
82 *
83 * @author Ann Wollrath
84 * @author Peter Jones
85 */
86@SuppressWarnings("deprecation")
87public class TCPTransport extends Transport {
88
89    /* tcp package log */
90    static final Log tcpLog = Log.getLog("sun.rmi.transport.tcp", "tcp",
91        LogStream.parseLevel(AccessController.doPrivileged(
92            (PrivilegedAction<String>) () -> System.getProperty("sun.rmi.transport.tcp.logLevel"))));
93
94    /** maximum number of connection handler threads */
95    private static final int maxConnectionThreads =     // default no limit
96        AccessController.doPrivileged((PrivilegedAction<Integer>) () ->
97            Integer.getInteger("sun.rmi.transport.tcp.maxConnectionThreads",
98                               Integer.MAX_VALUE));
99
100    /** keep alive time for idle connection handler threads */
101    private static final long threadKeepAliveTime =     // default 1 minute
102        AccessController.doPrivileged((PrivilegedAction<Long>) () ->
103            Long.getLong("sun.rmi.transport.tcp.threadKeepAliveTime", 60000));
104
105    /** enable multiplexing protocol */
106    private static final boolean enableMultiplexProtocol =     // default false
107            AccessController.doPrivileged((PrivilegedAction<Boolean>) () ->
108                    Boolean.getBoolean("sun.rmi.transport.tcp.enableMultiplexProtocol"));
109
110    /** thread pool for connection handlers */
111    private static final ExecutorService connectionThreadPool =
112        new ThreadPoolExecutor(0, maxConnectionThreads,
113            threadKeepAliveTime, TimeUnit.MILLISECONDS,
114            new SynchronousQueue<Runnable>(),
115            new ThreadFactory() {
116                public Thread newThread(Runnable runnable) {
117                    return AccessController.doPrivileged(new NewThreadAction(
118                        runnable, "TCP Connection(idle)", true, true));
119                }
120            });
121
122    /** total connections handled */
123    private static final AtomicInteger connectionCount = new AtomicInteger(0);
124
125    /** client host for the current thread's connection */
126    private static final ThreadLocal<ConnectionHandler>
127        threadConnectionHandler = new ThreadLocal<>();
128
129    /** an AccessControlContext with no permissions */
130    private static final AccessControlContext NOPERMS_ACC;
131    static {
132        Permissions perms = new Permissions();
133        ProtectionDomain[] pd = { new ProtectionDomain(null, perms) };
134        NOPERMS_ACC = new AccessControlContext(pd);
135    }
136
137    /** endpoints for this transport */
138    private final LinkedList<TCPEndpoint> epList;
139    /** number of objects exported on this transport */
140    private int exportCount = 0;
141    /** server socket for this transport */
142    private ServerSocket server = null;
143    /** table mapping endpoints to channels */
144    private final Map<TCPEndpoint,Reference<TCPChannel>> channelTable =
145        new WeakHashMap<>();
146
147    static final RMISocketFactory defaultSocketFactory =
148        RMISocketFactory.getDefaultSocketFactory();
149
150    /** number of milliseconds in accepted-connection timeout.
151     * Warning: this should be greater than 15 seconds (the client-side
152     * timeout), and defaults to 2 hours.
153     * The maximum representable value is slightly more than 24 days
154     * and 20 hours.
155     */
156    private static final int connectionReadTimeout =    // default 2 hours
157        AccessController.doPrivileged((PrivilegedAction<Integer>) () ->
158            Integer.getInteger("sun.rmi.transport.tcp.readTimeout", 2 * 3600 * 1000));
159
160    /**
161     * Constructs a TCPTransport.
162     */
163    TCPTransport(LinkedList<TCPEndpoint> epList)  {
164        // assert ((epList.size() != null) && (epList.size() >= 1))
165        this.epList = epList;
166        if (tcpLog.isLoggable(Log.BRIEF)) {
167            tcpLog.log(Log.BRIEF, "Version = " +
168                TransportConstants.Version + ", ep = " + getEndpoint());
169        }
170    }
171
172    /**
173     * Closes all cached connections in every channel subordinated to this
174     * transport.  Currently, this only closes outgoing connections.
175     */
176    public void shedConnectionCaches() {
177        List<TCPChannel> channels;
178        synchronized (channelTable) {
179            channels = new ArrayList<TCPChannel>(channelTable.values().size());
180            for (Reference<TCPChannel> ref : channelTable.values()) {
181                TCPChannel ch = ref.get();
182                if (ch != null) {
183                    channels.add(ch);
184                }
185            }
186        }
187        for (TCPChannel channel : channels) {
188            channel.shedCache();
189        }
190    }
191
192    /**
193     * Returns a <I>Channel</I> that generates connections to the
194     * endpoint <I>ep</I>. A Channel is an object that creates and
195     * manages connections of a particular type to some particular
196     * address space.
197     * @param ep the endpoint to which connections will be generated.
198     * @return the channel or null if the transport cannot
199     * generate connections to this endpoint
200     */
201    public TCPChannel getChannel(Endpoint ep) {
202        TCPChannel ch = null;
203        if (ep instanceof TCPEndpoint) {
204            synchronized (channelTable) {
205                Reference<TCPChannel> ref = channelTable.get(ep);
206                if (ref != null) {
207                    ch = ref.get();
208                }
209                if (ch == null) {
210                    TCPEndpoint tcpEndpoint = (TCPEndpoint) ep;
211                    ch = new TCPChannel(this, tcpEndpoint);
212                    channelTable.put(tcpEndpoint,
213                                     new WeakReference<TCPChannel>(ch));
214                }
215            }
216        }
217        return ch;
218    }
219
220    /**
221     * Removes the <I>Channel</I> that generates connections to the
222     * endpoint <I>ep</I>.
223     */
224    public void free(Endpoint ep) {
225        if (ep instanceof TCPEndpoint) {
226            synchronized (channelTable) {
227                Reference<TCPChannel> ref = channelTable.remove(ep);
228                if (ref != null) {
229                    TCPChannel channel = ref.get();
230                    if (channel != null) {
231                        channel.shedCache();
232                    }
233                }
234            }
235        }
236    }
237
238    /**
239     * Export the object so that it can accept incoming calls.
240     */
241    public void exportObject(Target target) throws RemoteException {
242        /*
243         * Ensure that a server socket is listening, and count this
244         * export while synchronized to prevent the server socket from
245         * being closed due to concurrent unexports.
246         */
247        synchronized (this) {
248            listen();
249            exportCount++;
250        }
251
252        /*
253         * Try to add the Target to the exported object table; keep
254         * counting this export (to keep server socket open) only if
255         * that succeeds.
256         */
257        boolean ok = false;
258        try {
259            super.exportObject(target);
260            ok = true;
261        } finally {
262            if (!ok) {
263                synchronized (this) {
264                    decrementExportCount();
265                }
266            }
267        }
268    }
269
270    protected synchronized void targetUnexported() {
271        decrementExportCount();
272    }
273
274    /**
275     * Decrements the count of exported objects, closing the current
276     * server socket if the count reaches zero.
277     **/
278    private void decrementExportCount() {
279        assert Thread.holdsLock(this);
280        exportCount--;
281        if (exportCount == 0 && getEndpoint().getListenPort() != 0) {
282            ServerSocket ss = server;
283            server = null;
284            try {
285                ss.close();
286            } catch (IOException e) {
287            }
288        }
289    }
290
291    /**
292     * Verify that the current access control context has permission to
293     * accept the connection being dispatched by the current thread.
294     */
295    protected void checkAcceptPermission(AccessControlContext acc) {
296        SecurityManager sm = System.getSecurityManager();
297        if (sm == null) {
298            return;
299        }
300        ConnectionHandler h = threadConnectionHandler.get();
301        if (h == null) {
302            throw new Error(
303                "checkAcceptPermission not in ConnectionHandler thread");
304        }
305        h.checkAcceptPermission(sm, acc);
306    }
307
308    private TCPEndpoint getEndpoint() {
309        synchronized (epList) {
310            return epList.getLast();
311        }
312    }
313
314    /**
315     * Listen on transport's endpoint.
316     */
317    private void listen() throws RemoteException {
318        assert Thread.holdsLock(this);
319        TCPEndpoint ep = getEndpoint();
320        int port = ep.getPort();
321
322        if (server == null) {
323            if (tcpLog.isLoggable(Log.BRIEF)) {
324                tcpLog.log(Log.BRIEF,
325                    "(port " + port + ") create server socket");
326            }
327
328            try {
329                server = ep.newServerSocket();
330                /*
331                 * Don't retry ServerSocket if creation fails since
332                 * "port in use" will cause export to hang if an
333                 * RMIFailureHandler is not installed.
334                 */
335                Thread t = AccessController.doPrivileged(
336                    new NewThreadAction(new AcceptLoop(server),
337                                        "TCP Accept-" + port, true));
338                t.start();
339            } catch (java.net.BindException e) {
340                throw new ExportException("Port already in use: " + port, e);
341            } catch (IOException e) {
342                throw new ExportException("Listen failed on port: " + port, e);
343            }
344
345        } else {
346            // otherwise verify security access to existing server socket
347            SecurityManager sm = System.getSecurityManager();
348            if (sm != null) {
349                sm.checkListen(port);
350            }
351        }
352    }
353
354    /**
355     * Worker for accepting connections from a server socket.
356     **/
357    private class AcceptLoop implements Runnable {
358
359        private final ServerSocket serverSocket;
360
361        // state for throttling loop on exceptions (local to accept thread)
362        private long lastExceptionTime = 0L;
363        private int recentExceptionCount;
364
365        AcceptLoop(ServerSocket serverSocket) {
366            this.serverSocket = serverSocket;
367        }
368
369        public void run() {
370            try {
371                executeAcceptLoop();
372            } finally {
373                try {
374                    /*
375                     * Only one accept loop is started per server
376                     * socket, so after no more connections will be
377                     * accepted, ensure that the server socket is no
378                     * longer listening.
379                     */
380                    serverSocket.close();
381                } catch (IOException e) {
382                }
383            }
384        }
385
386        /**
387         * Accepts connections from the server socket and executes
388         * handlers for them in the thread pool.
389         **/
390        private void executeAcceptLoop() {
391            if (tcpLog.isLoggable(Log.BRIEF)) {
392                tcpLog.log(Log.BRIEF, "listening on port " +
393                           getEndpoint().getPort());
394            }
395
396            while (true) {
397                Socket socket = null;
398                try {
399                    socket = serverSocket.accept();
400
401                    /*
402                     * Find client host name (or "0.0.0.0" if unknown)
403                     */
404                    InetAddress clientAddr = socket.getInetAddress();
405                    String clientHost = (clientAddr != null
406                                         ? clientAddr.getHostAddress()
407                                         : "0.0.0.0");
408
409                    /*
410                     * Execute connection handler in the thread pool,
411                     * which uses non-system threads.
412                     */
413                    try {
414                        connectionThreadPool.execute(
415                            new ConnectionHandler(socket, clientHost));
416                    } catch (RejectedExecutionException e) {
417                        closeSocket(socket);
418                        tcpLog.log(Log.BRIEF,
419                                   "rejected connection from " + clientHost);
420                    }
421
422                } catch (Throwable t) {
423                    try {
424                        /*
425                         * If the server socket has been closed, such
426                         * as because there are no more exported
427                         * objects, then we expect accept to throw an
428                         * exception, so just terminate normally.
429                         */
430                        if (serverSocket.isClosed()) {
431                            break;
432                        }
433
434                        try {
435                            if (tcpLog.isLoggable(Level.WARNING)) {
436                                tcpLog.log(Level.WARNING,
437                                           "accept loop for " + serverSocket +
438                                           " throws", t);
439                            }
440                        } catch (Throwable tt) {
441                        }
442                    } finally {
443                        /*
444                         * Always close the accepted socket (if any)
445                         * if an exception occurs, but only after
446                         * logging an unexpected exception.
447                         */
448                        if (socket != null) {
449                            closeSocket(socket);
450                        }
451                    }
452
453                    /*
454                     * In case we're running out of file descriptors,
455                     * release resources held in caches.
456                     */
457                    if (!(t instanceof SecurityException)) {
458                        try {
459                            TCPEndpoint.shedConnectionCaches();
460                        } catch (Throwable tt) {
461                        }
462                    }
463
464                    /*
465                     * A NoClassDefFoundError can occur if no file
466                     * descriptors are available, in which case this
467                     * loop should not terminate.
468                     */
469                    if (t instanceof Exception ||
470                        t instanceof OutOfMemoryError ||
471                        t instanceof NoClassDefFoundError)
472                    {
473                        if (!continueAfterAcceptFailure(t)) {
474                            return;
475                        }
476                        // continue loop
477                    } else if (t instanceof Error) {
478                        throw (Error) t;
479                    } else {
480                        throw new UndeclaredThrowableException(t);
481                    }
482                }
483            }
484        }
485
486        /**
487         * Returns true if the accept loop should continue after the
488         * specified exception has been caught, or false if the accept
489         * loop should terminate (closing the server socket).  If
490         * there is an RMIFailureHandler, this method returns the
491         * result of passing the specified exception to it; otherwise,
492         * this method always returns true, after sleeping to throttle
493         * the accept loop if necessary.
494         **/
495        private boolean continueAfterAcceptFailure(Throwable t) {
496            RMIFailureHandler fh = RMISocketFactory.getFailureHandler();
497            if (fh != null) {
498                return fh.failure(t instanceof Exception ? (Exception) t :
499                                  new InvocationTargetException(t));
500            } else {
501                throttleLoopOnException();
502                return true;
503            }
504        }
505
506        /**
507         * Throttles the accept loop after an exception has been
508         * caught: if a burst of 10 exceptions in 5 seconds occurs,
509         * then wait for 10 seconds to curb busy CPU usage.
510         **/
511        private void throttleLoopOnException() {
512            long now = System.currentTimeMillis();
513            if (lastExceptionTime == 0L || (now - lastExceptionTime) > 5000) {
514                // last exception was long ago (or this is the first)
515                lastExceptionTime = now;
516                recentExceptionCount = 0;
517            } else {
518                // exception burst window was started recently
519                if (++recentExceptionCount >= 10) {
520                    try {
521                        Thread.sleep(10000);
522                    } catch (InterruptedException ignore) {
523                    }
524                }
525            }
526        }
527    }
528
529    /** close socket and eat exception */
530    private static void closeSocket(Socket sock) {
531        try {
532            sock.close();
533        } catch (IOException ex) {
534            // eat exception
535        }
536    }
537
538    /**
539     * handleMessages decodes transport operations and handles messages
540     * appropriately.  If an exception occurs during message handling,
541     * the socket is closed.
542     */
543    void handleMessages(Connection conn, boolean persistent) {
544        int port = getEndpoint().getPort();
545
546        try {
547            DataInputStream in = new DataInputStream(conn.getInputStream());
548            do {
549                int op = in.read();     // transport op
550                if (op == -1) {
551                    if (tcpLog.isLoggable(Log.BRIEF)) {
552                        tcpLog.log(Log.BRIEF, "(port " +
553                            port + ") connection closed");
554                    }
555                    break;
556                }
557
558                if (tcpLog.isLoggable(Log.BRIEF)) {
559                    tcpLog.log(Log.BRIEF, "(port " + port +
560                        ") op = " + op);
561                }
562
563                switch (op) {
564                case TransportConstants.Call:
565                    // service incoming RMI call
566                    RemoteCall call = new StreamRemoteCall(conn);
567                    if (serviceCall(call) == false)
568                        return;
569                    break;
570
571                case TransportConstants.Ping:
572                    // send ack for ping
573                    DataOutputStream out =
574                        new DataOutputStream(conn.getOutputStream());
575                    out.writeByte(TransportConstants.PingAck);
576                    conn.releaseOutputStream();
577                    break;
578
579                case TransportConstants.DGCAck:
580                    DGCAckHandler.received(UID.read(in));
581                    break;
582
583                default:
584                    throw new IOException("unknown transport op " + op);
585                }
586            } while (persistent);
587
588        } catch (IOException e) {
589            // exception during processing causes connection to close (below)
590            if (tcpLog.isLoggable(Log.BRIEF)) {
591                tcpLog.log(Log.BRIEF, "(port " + port +
592                    ") exception: ", e);
593            }
594        } finally {
595            try {
596                conn.close();
597            } catch (IOException ex) {
598                // eat exception
599            }
600        }
601    }
602
603    /**
604     * Returns the client host for the current thread's connection.  Throws
605     * ServerNotActiveException if no connection is active for this thread.
606     */
607    public static String getClientHost() throws ServerNotActiveException {
608        ConnectionHandler h = threadConnectionHandler.get();
609        if (h != null) {
610            return h.getClientHost();
611        } else {
612            throw new ServerNotActiveException("not in a remote call");
613        }
614    }
615
616    /**
617     * Services messages on accepted connection
618     */
619    private class ConnectionHandler implements Runnable {
620
621        /** int value of "POST" in ASCII (Java's specified data formats
622         *  make this once-reviled tactic again socially acceptable) */
623        private static final int POST = 0x504f5354;
624
625        /** most recently accept-authorized AccessControlContext */
626        private AccessControlContext okContext;
627        /** cache of accept-authorized AccessControlContexts */
628        private Map<AccessControlContext,
629                    Reference<AccessControlContext>> authCache;
630        /** security manager which authorized contexts in authCache */
631        private SecurityManager cacheSecurityManager = null;
632
633        private Socket socket;
634        private String remoteHost;
635
636        ConnectionHandler(Socket socket, String remoteHost) {
637            this.socket = socket;
638            this.remoteHost = remoteHost;
639        }
640
641        String getClientHost() {
642            return remoteHost;
643        }
644
645        /**
646         * Verify that the given AccessControlContext has permission to
647         * accept this connection.
648         */
649        void checkAcceptPermission(SecurityManager sm,
650                                   AccessControlContext acc)
651        {
652            /*
653             * Note: no need to synchronize on cache-related fields, since this
654             * method only gets called from the ConnectionHandler's thread.
655             */
656            if (sm != cacheSecurityManager) {
657                okContext = null;
658                authCache = new WeakHashMap<AccessControlContext,
659                                            Reference<AccessControlContext>>();
660                cacheSecurityManager = sm;
661            }
662            if (acc.equals(okContext) || authCache.containsKey(acc)) {
663                return;
664            }
665            InetAddress addr = socket.getInetAddress();
666            String host = (addr != null) ? addr.getHostAddress() : "*";
667
668            sm.checkAccept(host, socket.getPort());
669
670            authCache.put(acc, new SoftReference<AccessControlContext>(acc));
671            okContext = acc;
672        }
673
674        public void run() {
675            Thread t = Thread.currentThread();
676            String name = t.getName();
677            try {
678                t.setName("RMI TCP Connection(" +
679                          connectionCount.incrementAndGet() +
680                          ")-" + remoteHost);
681                AccessController.doPrivileged((PrivilegedAction<Void>)() -> {
682                    run0();
683                    return null;
684                }, NOPERMS_ACC);
685            } finally {
686                t.setName(name);
687            }
688        }
689
690        private void run0() {
691            TCPEndpoint endpoint = getEndpoint();
692            int port = endpoint.getPort();
693
694            threadConnectionHandler.set(this);
695
696            // set socket to disable Nagle's algorithm (always send
697            // immediately)
698            // TBD: should this be left up to socket factory instead?
699            try {
700                socket.setTcpNoDelay(true);
701            } catch (Exception e) {
702                // if we fail to set this, ignore and proceed anyway
703            }
704            // set socket to timeout after excessive idle time
705            try {
706                if (connectionReadTimeout > 0)
707                    socket.setSoTimeout(connectionReadTimeout);
708            } catch (Exception e) {
709                // too bad, continue anyway
710            }
711
712            try {
713                InputStream sockIn = socket.getInputStream();
714                InputStream bufIn = sockIn.markSupported()
715                        ? sockIn
716                        : new BufferedInputStream(sockIn);
717
718                // Read magic
719                DataInputStream in = new DataInputStream(bufIn);
720                int magic = in.readInt();
721
722                // read and verify transport header
723                short version = in.readShort();
724                if (magic != TransportConstants.Magic ||
725                    version != TransportConstants.Version) {
726                    // protocol mismatch detected...
727                    // just close socket: this would recurse if we marshal an
728                    // exception to the client and the protocol at other end
729                    // doesn't match.
730                    closeSocket(socket);
731                    return;
732                }
733
734                OutputStream sockOut = socket.getOutputStream();
735                BufferedOutputStream bufOut =
736                    new BufferedOutputStream(sockOut);
737                DataOutputStream out = new DataOutputStream(bufOut);
738
739                int remotePort = socket.getPort();
740
741                if (tcpLog.isLoggable(Log.BRIEF)) {
742                    tcpLog.log(Log.BRIEF, "accepted socket from [" +
743                                     remoteHost + ":" + remotePort + "]");
744                }
745
746                TCPEndpoint ep;
747                TCPChannel ch;
748                TCPConnection conn;
749
750                // send ack (or nack) for protocol
751                byte protocol = in.readByte();
752                switch (protocol) {
753                case TransportConstants.SingleOpProtocol:
754                    // no ack for protocol
755
756                    // create dummy channel for receiving messages
757                    ep = new TCPEndpoint(remoteHost, socket.getLocalPort(),
758                                         endpoint.getClientSocketFactory(),
759                                         endpoint.getServerSocketFactory());
760                    ch = new TCPChannel(TCPTransport.this, ep);
761                    conn = new TCPConnection(ch, socket, bufIn, bufOut);
762
763                    // read input messages
764                    handleMessages(conn, false);
765                    break;
766
767                case TransportConstants.StreamProtocol:
768                    // send ack
769                    out.writeByte(TransportConstants.ProtocolAck);
770
771                    // suggest endpoint (in case client doesn't know host name)
772                    if (tcpLog.isLoggable(Log.VERBOSE)) {
773                        tcpLog.log(Log.VERBOSE, "(port " + port +
774                            ") " + "suggesting " + remoteHost + ":" +
775                            remotePort);
776                    }
777
778                    out.writeUTF(remoteHost);
779                    out.writeInt(remotePort);
780                    out.flush();
781
782                    // read and discard (possibly bogus) endpoint
783                    // REMIND: would be faster to read 2 bytes then skip N+4
784                    String clientHost = in.readUTF();
785                    int    clientPort = in.readInt();
786                    if (tcpLog.isLoggable(Log.VERBOSE)) {
787                        tcpLog.log(Log.VERBOSE, "(port " + port +
788                            ") client using " + clientHost + ":" + clientPort);
789                    }
790
791                    // create dummy channel for receiving messages
792                    // (why not use clientHost and clientPort?)
793                    ep = new TCPEndpoint(remoteHost, socket.getLocalPort(),
794                                         endpoint.getClientSocketFactory(),
795                                         endpoint.getServerSocketFactory());
796                    ch = new TCPChannel(TCPTransport.this, ep);
797                    conn = new TCPConnection(ch, socket, bufIn, bufOut);
798
799                    // read input messages
800                    handleMessages(conn, true);
801                    break;
802
803                case TransportConstants.MultiplexProtocol:
804
805                    if (!enableMultiplexProtocol) {
806                        if (tcpLog.isLoggable(Log.VERBOSE)) {
807                            tcpLog.log(Log.VERBOSE, "(port " + port +
808                                    ") rejecting multiplex protocol");
809                        }
810
811                        // If MultiplexProtocol is disabled, send NACK immediately.
812                        out.writeByte(TransportConstants.ProtocolNack);
813                        out.flush();
814                        break;
815                    }
816
817                    if (tcpLog.isLoggable(Log.VERBOSE)) {
818                        tcpLog.log(Log.VERBOSE, "(port " + port +
819                            ") accepting multiplex protocol");
820                    }
821
822                    // send ack
823                    out.writeByte(TransportConstants.ProtocolAck);
824
825                    // suggest endpoint (in case client doesn't already have one)
826                    if (tcpLog.isLoggable(Log.VERBOSE)) {
827                        tcpLog.log(Log.VERBOSE, "(port " + port +
828                            ") suggesting " + remoteHost + ":" + remotePort);
829                    }
830
831                    out.writeUTF(remoteHost);
832                    out.writeInt(remotePort);
833                    out.flush();
834
835                    // read endpoint client has decided to use
836                    ep = new TCPEndpoint(in.readUTF(), in.readInt(),
837                                         endpoint.getClientSocketFactory(),
838                                         endpoint.getServerSocketFactory());
839                    if (tcpLog.isLoggable(Log.VERBOSE)) {
840                        tcpLog.log(Log.VERBOSE, "(port " +
841                            port + ") client using " +
842                            ep.getHost() + ":" + ep.getPort());
843                    }
844
845                    ConnectionMultiplexer multiplexer;
846                    synchronized (channelTable) {
847                        // create or find channel for this endpoint
848                        ch = getChannel(ep);
849                        multiplexer =
850                            new ConnectionMultiplexer(ch, bufIn, sockOut,
851                                                      false);
852                        ch.useMultiplexer(multiplexer);
853                    }
854                    multiplexer.run();
855                    break;
856
857                default:
858                    // protocol not understood, send nack and close socket
859                    out.writeByte(TransportConstants.ProtocolNack);
860                    out.flush();
861                    break;
862                }
863
864            } catch (IOException e) {
865                // socket in unknown state: destroy socket
866                tcpLog.log(Log.BRIEF, "terminated with exception:", e);
867            } finally {
868                closeSocket(socket);
869            }
870        }
871    }
872}
873