1/*
2 * Copyright (c) 1999, 2014, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package com.sun.jndi.ldap;
27
28import java.io.BufferedInputStream;
29import java.io.BufferedOutputStream;
30import java.io.InterruptedIOException;
31import java.io.IOException;
32import java.io.OutputStream;
33import java.io.InputStream;
34import java.net.InetSocketAddress;
35import java.net.Socket;
36import javax.net.ssl.SSLSocket;
37
38import javax.naming.CommunicationException;
39import javax.naming.ServiceUnavailableException;
40import javax.naming.NamingException;
41import javax.naming.InterruptedNamingException;
42
43import javax.naming.ldap.Control;
44
45import java.lang.reflect.Method;
46import java.lang.reflect.InvocationTargetException;
47import java.util.Arrays;
48import javax.net.SocketFactory;
49
50/**
51  * A thread that creates a connection to an LDAP server.
52  * After the connection, the thread reads from the connection.
53  * A caller can invoke methods on the instance to read LDAP responses
54  * and to send LDAP requests.
55  * <p>
56  * There is a one-to-one correspondence between an LdapClient and
57  * a Connection. Access to Connection and its methods is only via
58  * LdapClient with two exceptions: SASL authentication and StartTLS.
59  * SASL needs to access Connection's socket IO streams (in order to do encryption
60  * of the security layer). StartTLS needs to do replace IO streams
61  * and close the IO  streams on nonfatal close. The code for SASL
62  * authentication can be treated as being the same as from LdapClient
63  * because the SASL code is only ever called from LdapClient, from
64  * inside LdapClient's synchronized authenticate() method. StartTLS is called
65  * directly by the application but should only occur when the underlying
66  * connection is quiet.
67  * <p>
68  * In terms of synchronization, worry about data structures
69  * used by the Connection thread because that usage might contend
70  * with calls by the main threads (i.e., those that call LdapClient).
71  * Main threads need to worry about contention with each other.
72  * Fields that Connection thread uses:
73  *     inStream - synced access and update; initialized in constructor;
74  *           referenced outside class unsync'ed (by LdapSasl) only
75  *           when connection is quiet
76  *     traceFile, traceTagIn, traceTagOut - no sync; debugging only
77  *     parent - no sync; initialized in constructor; no updates
78  *     pendingRequests - sync
79  *     pauseLock - per-instance lock;
80  *     paused - sync via pauseLock (pauseReader())
81  * Members used by main threads (LdapClient):
82  *     host, port - unsync; read-only access for StartTLS and debug messages
83  *     setBound(), setV3() - no sync; called only by LdapClient.authenticate(),
84  *             which is a sync method called only when connection is "quiet"
85  *     getMsgId() - sync
86  *     writeRequest(), removeRequest(),findRequest(), abandonOutstandingReqs() -
87  *             access to shared pendingRequests is sync
88  *     writeRequest(),  abandonRequest(), ldapUnbind() - access to outStream sync
89  *     cleanup() - sync
90  *     readReply() - access to sock sync
91  *     unpauseReader() - (indirectly via writeRequest) sync on pauseLock
92  * Members used by SASL auth (main thread):
93  *     inStream, outStream - no sync; used to construct new stream; accessed
94  *             only when conn is "quiet" and not shared
95  *     replaceStreams() - sync method
96  * Members used by StartTLS:
97  *     inStream, outStream - no sync; used to record the existing streams;
98  *             accessed only when conn is "quiet" and not shared
99  *     replaceStreams() - sync method
100  * <p>
101  * Handles anonymous, simple, and SASL bind for v3; anonymous and simple
102  * for v2.
103  * %%% made public for access by LdapSasl %%%
104  *
105  * @author Vincent Ryan
106  * @author Rosanna Lee
107  * @author Jagane Sundar
108  */
109public final class Connection implements Runnable {
110
111    private static final boolean debug = false;
112    private static final int dump = 0; // > 0 r, > 1 rw
113
114
115    final private Thread worker;    // Initialized in constructor
116
117    private boolean v3 = true;       // Set in setV3()
118
119    final public String host;  // used by LdapClient for generating exception messages
120                         // used by StartTlsResponse when creating an SSL socket
121    final public int port;     // used by LdapClient for generating exception messages
122                         // used by StartTlsResponse when creating an SSL socket
123
124    private boolean bound = false;   // Set in setBound()
125
126    // All three are initialized in constructor and read-only afterwards
127    private OutputStream traceFile = null;
128    private String traceTagIn = null;
129    private String traceTagOut = null;
130
131    // Initialized in constructor; read and used externally (LdapSasl);
132    // Updated in replaceStreams() during "quiet", unshared, period
133    public InputStream inStream;   // must be public; used by LdapSasl
134
135    // Initialized in constructor; read and used externally (LdapSasl);
136    // Updated in replaceOutputStream() during "quiet", unshared, period
137    public OutputStream outStream; // must be public; used by LdapSasl
138
139    // Initialized in constructor; read and used externally (TLS) to
140    // get new IO streams; closed during cleanup
141    public Socket sock;            // for TLS
142
143    // For processing "disconnect" unsolicited notification
144    // Initialized in constructor
145    final private LdapClient parent;
146
147    // Incremented and returned in sync getMsgId()
148    private int outMsgId = 0;
149
150    //
151    // The list of ldapRequests pending on this binding
152    //
153    // Accessed only within sync methods
154    private LdapRequest pendingRequests = null;
155
156    volatile IOException closureReason = null;
157    volatile boolean useable = true;  // is Connection still useable
158
159    int readTimeout;
160    int connectTimeout;
161
162    // true means v3; false means v2
163    // Called in LdapClient.authenticate() (which is synchronized)
164    // when connection is "quiet" and not shared; no need to synchronize
165    void setV3(boolean v) {
166        v3 = v;
167    }
168
169    // A BIND request has been successfully made on this connection
170    // When cleaning up, remember to do an UNBIND
171    // Called in LdapClient.authenticate() (which is synchronized)
172    // when connection is "quiet" and not shared; no need to synchronize
173    void setBound() {
174        bound = true;
175    }
176
177    ////////////////////////////////////////////////////////////////////////////
178    //
179    // Create an LDAP Binding object and bind to a particular server
180    //
181    ////////////////////////////////////////////////////////////////////////////
182
183    Connection(LdapClient parent, String host, int port, String socketFactory,
184        int connectTimeout, int readTimeout, OutputStream trace) throws NamingException {
185
186        this.host = host;
187        this.port = port;
188        this.parent = parent;
189        this.readTimeout = readTimeout;
190        this.connectTimeout = connectTimeout;
191
192        if (trace != null) {
193            traceFile = trace;
194            traceTagIn = "<- " + host + ":" + port + "\n\n";
195            traceTagOut = "-> " + host + ":" + port + "\n\n";
196        }
197
198        //
199        // Connect to server
200        //
201        try {
202            sock = createSocket(host, port, socketFactory, connectTimeout);
203
204            if (debug) {
205                System.err.println("Connection: opening socket: " + host + "," + port);
206            }
207
208            inStream = new BufferedInputStream(sock.getInputStream());
209            outStream = new BufferedOutputStream(sock.getOutputStream());
210
211        } catch (InvocationTargetException e) {
212            Throwable realException = e.getTargetException();
213            // realException.printStackTrace();
214
215            CommunicationException ce =
216                new CommunicationException(host + ":" + port);
217            ce.setRootCause(realException);
218            throw ce;
219        } catch (Exception e) {
220            // We need to have a catch all here and
221            // ignore generic exceptions.
222            // Also catches all IO errors generated by socket creation.
223            CommunicationException ce =
224                new CommunicationException(host + ":" + port);
225            ce.setRootCause(e);
226            throw ce;
227        }
228
229        worker = Obj.helper.createThread(this);
230        worker.setDaemon(true);
231        worker.start();
232    }
233
234    /*
235     * Create an InetSocketAddress using the specified hostname and port number.
236     */
237    private InetSocketAddress createInetSocketAddress(String host, int port) {
238            return new InetSocketAddress(host, port);
239    }
240
241    /*
242     * Create a Socket object using the specified socket factory and time limit.
243     *
244     * If a timeout is supplied and unconnected sockets are supported then
245     * an unconnected socket is created and the timeout is applied when
246     * connecting the socket. If a timeout is supplied but unconnected sockets
247     * are not supported then the timeout is ignored and a connected socket
248     * is created.
249     */
250    private Socket createSocket(String host, int port, String socketFactory,
251            int connectTimeout) throws Exception {
252
253        Socket socket = null;
254
255        if (socketFactory != null) {
256
257            // create the factory
258
259            @SuppressWarnings("unchecked")
260            Class<? extends SocketFactory> socketFactoryClass =
261                (Class<? extends SocketFactory>)Obj.helper.loadClass(socketFactory);
262            Method getDefault =
263                socketFactoryClass.getMethod("getDefault", new Class<?>[]{});
264            SocketFactory factory = (SocketFactory) getDefault.invoke(null, new Object[]{});
265
266            // create the socket
267
268            if (connectTimeout > 0) {
269
270                InetSocketAddress endpoint =
271                        createInetSocketAddress(host, port);
272
273                // unconnected socket
274                socket = factory.createSocket();
275
276                if (debug) {
277                    System.err.println("Connection: creating socket with " +
278                            "a timeout using supplied socket factory");
279                }
280
281                // connected socket
282                socket.connect(endpoint, connectTimeout);
283            }
284
285            // continue (but ignore connectTimeout)
286            if (socket == null) {
287                if (debug) {
288                    System.err.println("Connection: creating socket using " +
289                        "supplied socket factory");
290                }
291                // connected socket
292                socket = factory.createSocket(host, port);
293            }
294        } else {
295
296            if (connectTimeout > 0) {
297
298                    InetSocketAddress endpoint = createInetSocketAddress(host, port);
299
300                    socket = new Socket();
301
302                    if (debug) {
303                        System.err.println("Connection: creating socket with " +
304                            "a timeout");
305                    }
306                    socket.connect(endpoint, connectTimeout);
307            }
308
309            // continue (but ignore connectTimeout)
310
311            if (socket == null) {
312                if (debug) {
313                    System.err.println("Connection: creating socket");
314                }
315                // connected socket
316                socket = new Socket(host, port);
317            }
318        }
319
320        // For LDAP connect timeouts on LDAP over SSL connections must treat
321        // the SSL handshake following socket connection as part of the timeout.
322        // So explicitly set a socket read timeout, trigger the SSL handshake,
323        // then reset the timeout.
324        if (connectTimeout > 0 && socket instanceof SSLSocket) {
325            SSLSocket sslSocket = (SSLSocket) socket;
326            int socketTimeout = sslSocket.getSoTimeout();
327
328            sslSocket.setSoTimeout(connectTimeout); // reuse full timeout value
329            sslSocket.startHandshake();
330            sslSocket.setSoTimeout(socketTimeout);
331        }
332
333        return socket;
334    }
335
336    ////////////////////////////////////////////////////////////////////////////
337    //
338    // Methods to IO to the LDAP server
339    //
340    ////////////////////////////////////////////////////////////////////////////
341
342    synchronized int getMsgId() {
343        return ++outMsgId;
344    }
345
346    LdapRequest writeRequest(BerEncoder ber, int msgId) throws IOException {
347        return writeRequest(ber, msgId, false /* pauseAfterReceipt */, -1);
348    }
349
350    LdapRequest writeRequest(BerEncoder ber, int msgId,
351        boolean pauseAfterReceipt) throws IOException {
352        return writeRequest(ber, msgId, pauseAfterReceipt, -1);
353    }
354
355    LdapRequest writeRequest(BerEncoder ber, int msgId,
356        boolean pauseAfterReceipt, int replyQueueCapacity) throws IOException {
357
358        LdapRequest req =
359            new LdapRequest(msgId, pauseAfterReceipt, replyQueueCapacity);
360        addRequest(req);
361
362        if (traceFile != null) {
363            Ber.dumpBER(traceFile, traceTagOut, ber.getBuf(), 0, ber.getDataLen());
364        }
365
366
367        // unpause reader so that it can get response
368        // NOTE: Must do this before writing request, otherwise might
369        // create a race condition where the writer unblocks its own response
370        unpauseReader();
371
372        if (debug) {
373            System.err.println("Writing request to: " + outStream);
374        }
375
376        try {
377            synchronized (this) {
378                outStream.write(ber.getBuf(), 0, ber.getDataLen());
379                outStream.flush();
380            }
381        } catch (IOException e) {
382            cleanup(null, true);
383            throw (closureReason = e); // rethrow
384        }
385
386        return req;
387    }
388
389    /**
390     * Reads a reply; waits until one is ready.
391     */
392    BerDecoder readReply(LdapRequest ldr)
393            throws IOException, NamingException {
394        BerDecoder rber;
395
396        // Track down elapsed time to workaround spurious wakeups
397        long elapsedMilli = 0;
398        long elapsedNano = 0;
399
400        while (((rber = ldr.getReplyBer()) == null) &&
401                (readTimeout <= 0 || elapsedMilli < readTimeout))
402        {
403            try {
404                // If socket closed, don't even try
405                synchronized (this) {
406                    if (sock == null) {
407                        throw new ServiceUnavailableException(host + ":" + port +
408                            "; socket closed");
409                    }
410                }
411                synchronized (ldr) {
412                    // check if condition has changed since our last check
413                    rber = ldr.getReplyBer();
414                    if (rber == null) {
415                        if (readTimeout > 0) {  // Socket read timeout is specified
416                            long beginNano = System.nanoTime();
417
418                            // will be woken up before readTimeout if reply is
419                            // available
420                            ldr.wait(readTimeout - elapsedMilli);
421                            elapsedNano += (System.nanoTime() - beginNano);
422                            elapsedMilli += elapsedNano / 1000_000;
423                            elapsedNano %= 1000_000;
424
425                        } else {
426                            // no timeout is set so we wait infinitely until
427                            // a response is received
428                            // http://docs.oracle.com/javase/8/docs/technotes/guides/jndi/jndi-ldap.html#PROP
429                            ldr.wait();
430                        }
431                    } else {
432                        break;
433                    }
434                }
435            } catch (InterruptedException ex) {
436                throw new InterruptedNamingException(
437                    "Interrupted during LDAP operation");
438            }
439        }
440
441        if ((rber == null) && (elapsedMilli >= readTimeout)) {
442            abandonRequest(ldr, null);
443            throw new NamingException("LDAP response read timed out, timeout used:"
444                            + readTimeout + "ms." );
445
446        }
447        return rber;
448    }
449
450
451    ////////////////////////////////////////////////////////////////////////////
452    //
453    // Methods to add, find, delete, and abandon requests made to server
454    //
455    ////////////////////////////////////////////////////////////////////////////
456
457    private synchronized void addRequest(LdapRequest ldapRequest) {
458
459        LdapRequest ldr = pendingRequests;
460        if (ldr == null) {
461            pendingRequests = ldapRequest;
462            ldapRequest.next = null;
463        } else {
464            ldapRequest.next = pendingRequests;
465            pendingRequests = ldapRequest;
466        }
467    }
468
469    synchronized LdapRequest findRequest(int msgId) {
470
471        LdapRequest ldr = pendingRequests;
472        while (ldr != null) {
473            if (ldr.msgId == msgId) {
474                return ldr;
475            }
476            ldr = ldr.next;
477        }
478        return null;
479
480    }
481
482    synchronized void removeRequest(LdapRequest req) {
483        LdapRequest ldr = pendingRequests;
484        LdapRequest ldrprev = null;
485
486        while (ldr != null) {
487            if (ldr == req) {
488                ldr.cancel();
489
490                if (ldrprev != null) {
491                    ldrprev.next = ldr.next;
492                } else {
493                    pendingRequests = ldr.next;
494                }
495                ldr.next = null;
496            }
497            ldrprev = ldr;
498            ldr = ldr.next;
499        }
500    }
501
502    void abandonRequest(LdapRequest ldr, Control[] reqCtls) {
503        // Remove from queue
504        removeRequest(ldr);
505
506        BerEncoder ber = new BerEncoder(256);
507        int abandonMsgId = getMsgId();
508
509        //
510        // build the abandon request.
511        //
512        try {
513            ber.beginSeq(Ber.ASN_SEQUENCE | Ber.ASN_CONSTRUCTOR);
514                ber.encodeInt(abandonMsgId);
515                ber.encodeInt(ldr.msgId, LdapClient.LDAP_REQ_ABANDON);
516
517                if (v3) {
518                    LdapClient.encodeControls(ber, reqCtls);
519                }
520            ber.endSeq();
521
522            if (traceFile != null) {
523                Ber.dumpBER(traceFile, traceTagOut, ber.getBuf(), 0,
524                    ber.getDataLen());
525            }
526
527            synchronized (this) {
528                outStream.write(ber.getBuf(), 0, ber.getDataLen());
529                outStream.flush();
530            }
531
532        } catch (IOException ex) {
533            //System.err.println("ldap.abandon: " + ex);
534        }
535
536        // Don't expect any response for the abandon request.
537    }
538
539    synchronized void abandonOutstandingReqs(Control[] reqCtls) {
540        LdapRequest ldr = pendingRequests;
541
542        while (ldr != null) {
543            abandonRequest(ldr, reqCtls);
544            pendingRequests = ldr = ldr.next;
545        }
546    }
547
548    ////////////////////////////////////////////////////////////////////////////
549    //
550    // Methods to unbind from server and clear up resources when object is
551    // destroyed.
552    //
553    ////////////////////////////////////////////////////////////////////////////
554
555    private void ldapUnbind(Control[] reqCtls) {
556
557        BerEncoder ber = new BerEncoder(256);
558        int unbindMsgId = getMsgId();
559
560        //
561        // build the unbind request.
562        //
563
564        try {
565
566            ber.beginSeq(Ber.ASN_SEQUENCE | Ber.ASN_CONSTRUCTOR);
567                ber.encodeInt(unbindMsgId);
568                // IMPLICIT TAGS
569                ber.encodeByte(LdapClient.LDAP_REQ_UNBIND);
570                ber.encodeByte(0);
571
572                if (v3) {
573                    LdapClient.encodeControls(ber, reqCtls);
574                }
575            ber.endSeq();
576
577            if (traceFile != null) {
578                Ber.dumpBER(traceFile, traceTagOut, ber.getBuf(),
579                    0, ber.getDataLen());
580            }
581
582            synchronized (this) {
583                outStream.write(ber.getBuf(), 0, ber.getDataLen());
584                outStream.flush();
585            }
586
587        } catch (IOException ex) {
588            //System.err.println("ldap.unbind: " + ex);
589        }
590
591        // Don't expect any response for the unbind request.
592    }
593
594    /**
595     * @param reqCtls Possibly null request controls that accompanies the
596     *    abandon and unbind LDAP request.
597     * @param notifyParent true means to call parent LdapClient back, notifying
598     *    it that the connection has been closed; false means not to notify
599     *    parent. If LdapClient invokes cleanup(), notifyParent should be set to
600     *    false because LdapClient already knows that it is closing
601     *    the connection. If Connection invokes cleanup(), notifyParent should be
602     *    set to true because LdapClient needs to know about the closure.
603     */
604    void cleanup(Control[] reqCtls, boolean notifyParent) {
605        boolean nparent = false;
606
607        synchronized (this) {
608            useable = false;
609
610            if (sock != null) {
611                if (debug) {
612                    System.err.println("Connection: closing socket: " + host + "," + port);
613                }
614                try {
615                    if (!notifyParent) {
616                        abandonOutstandingReqs(reqCtls);
617                    }
618                    if (bound) {
619                        ldapUnbind(reqCtls);
620                    }
621                } finally {
622                    try {
623                        outStream.flush();
624                        sock.close();
625                        unpauseReader();
626                    } catch (IOException ie) {
627                        if (debug)
628                            System.err.println("Connection: problem closing socket: " + ie);
629                    }
630                    if (!notifyParent) {
631                        LdapRequest ldr = pendingRequests;
632                        while (ldr != null) {
633                            ldr.cancel();
634                            ldr = ldr.next;
635                        }
636                    }
637                    sock = null;
638                }
639                nparent = notifyParent;
640            }
641            if (nparent) {
642                LdapRequest ldr = pendingRequests;
643                while (ldr != null) {
644
645                    synchronized (ldr) {
646                        ldr.notify();
647                        ldr = ldr.next;
648                    }
649                }
650            }
651        }
652        if (nparent) {
653            parent.processConnectionClosure();
654        }
655    }
656
657
658    // Assume everything is "quiet"
659    // "synchronize" might lead to deadlock so don't synchronize method
660    // Use streamLock instead for synchronizing update to stream
661
662    synchronized public void replaceStreams(InputStream newIn, OutputStream newOut) {
663        if (debug) {
664            System.err.println("Replacing " + inStream + " with: " + newIn);
665            System.err.println("Replacing " + outStream + " with: " + newOut);
666        }
667
668        inStream = newIn;
669
670        // Cleanup old stream
671        try {
672            outStream.flush();
673        } catch (IOException ie) {
674            if (debug)
675                System.err.println("Connection: cannot flush outstream: " + ie);
676        }
677
678        // Replace stream
679        outStream = newOut;
680    }
681
682    /**
683     * Used by Connection thread to read inStream into a local variable.
684     * This ensures that there is no contention between the main thread
685     * and the Connection thread when the main thread updates inStream.
686     */
687    synchronized private InputStream getInputStream() {
688        return inStream;
689    }
690
691
692    ////////////////////////////////////////////////////////////////////////////
693    //
694    // Code for pausing/unpausing the reader thread ('worker')
695    //
696    ////////////////////////////////////////////////////////////////////////////
697
698    /*
699     * The main idea is to mark requests that need the reader thread to
700     * pause after getting the response. When the reader thread gets the response,
701     * it waits on a lock instead of returning to the read(). The next time a
702     * request is sent, the reader is automatically unblocked if necessary.
703     * Note that the reader must be unblocked BEFORE the request is sent.
704     * Otherwise, there is a race condition where the request is sent and
705     * the reader thread might read the response and be unblocked
706     * by writeRequest().
707     *
708     * This pause gives the main thread (StartTLS or SASL) an opportunity to
709     * update the reader's state (e.g., its streams) if necessary.
710     * The assumption is that the connection will remain quiet during this pause
711     * (i.e., no intervening requests being sent).
712     *<p>
713     * For dealing with StartTLS close,
714     * when the read() exits either due to EOF or an exception,
715     * the reader thread checks whether there is a new stream to read from.
716     * If so, then it reattempts the read. Otherwise, the EOF or exception
717     * is processed and the reader thread terminates.
718     * In a StartTLS close, the client first replaces the SSL IO streams with
719     * plain ones and then closes the SSL socket.
720     * If the reader thread attempts to read, or was reading, from
721     * the SSL socket (that is, it got to the read BEFORE replaceStreams()),
722     * the SSL socket close will cause the reader thread to
723     * get an EOF/exception and reexamine the input stream.
724     * If the reader thread sees a new stream, it reattempts the read.
725     * If the underlying socket is still alive, then the new read will succeed.
726     * If the underlying socket has been closed also, then the new read will
727     * fail and the reader thread exits.
728     * If the reader thread attempts to read, or was reading, from the plain
729     * socket (that is, it got to the read AFTER replaceStreams()), the
730     * SSL socket close will have no effect on the reader thread.
731     *
732     * The check for new stream is made only
733     * in the first attempt at reading a BER buffer; the reader should
734     * never be in midst of reading a buffer when a nonfatal close occurs.
735     * If this occurs, then the connection is in an inconsistent state and
736     * the safest thing to do is to shut it down.
737     */
738
739    private Object pauseLock = new Object();  // lock for reader to wait on while paused
740    private boolean paused = false;           // paused state of reader
741
742    /*
743     * Unpauses reader thread if it was paused
744     */
745    private void unpauseReader() throws IOException {
746        synchronized (pauseLock) {
747            if (paused) {
748                if (debug) {
749                    System.err.println("Unpausing reader; read from: " +
750                                        inStream);
751                }
752                paused = false;
753                pauseLock.notify();
754            }
755        }
756    }
757
758     /*
759     * Pauses reader so that it stops reading from the input stream.
760     * Reader blocks on pauseLock instead of read().
761     * MUST be called from within synchronized (pauseLock) clause.
762     */
763    private void pauseReader() throws IOException {
764        if (debug) {
765            System.err.println("Pausing reader;  was reading from: " +
766                                inStream);
767        }
768        paused = true;
769        try {
770            while (paused) {
771                pauseLock.wait(); // notified by unpauseReader
772            }
773        } catch (InterruptedException e) {
774            throw new InterruptedIOException(
775                    "Pause/unpause reader has problems.");
776        }
777    }
778
779
780    ////////////////////////////////////////////////////////////////////////////
781    //
782    // The LDAP Binding thread. It does the mux/demux of multiple requests
783    // on the same TCP connection.
784    //
785    ////////////////////////////////////////////////////////////////////////////
786
787
788    public void run() {
789        byte inbuf[];   // Buffer for reading incoming bytes
790        int inMsgId;    // Message id of incoming response
791        int bytesread;  // Number of bytes in inbuf
792        int br;         // Temp; number of bytes read from stream
793        int offset;     // Offset of where to store bytes in inbuf
794        int seqlen;     // Length of ASN sequence
795        int seqlenlen;  // Number of sequence length bytes
796        boolean eos;    // End of stream
797        BerDecoder retBer;    // Decoder for ASN.1 BER data from inbuf
798        InputStream in = null;
799
800        try {
801            while (true) {
802                try {
803                    // type and length (at most 128 octets for long form)
804                    inbuf = new byte[129];
805
806                    offset = 0;
807                    seqlen = 0;
808                    seqlenlen = 0;
809
810                    in = getInputStream();
811
812                    // check that it is the beginning of a sequence
813                    bytesread = in.read(inbuf, offset, 1);
814                    if (bytesread < 0) {
815                        if (in != getInputStream()) {
816                            continue;   // a new stream to try
817                        } else {
818                            break; // EOF
819                        }
820                    }
821
822                    if (inbuf[offset++] != (Ber.ASN_SEQUENCE | Ber.ASN_CONSTRUCTOR))
823                        continue;
824
825                    // get length of sequence
826                    bytesread = in.read(inbuf, offset, 1);
827                    if (bytesread < 0)
828                        break; // EOF
829                    seqlen = inbuf[offset++];
830
831                    // if high bit is on, length is encoded in the
832                    // subsequent length bytes and the number of length bytes
833                    // is equal to & 0x80 (i.e. length byte with high bit off).
834                    if ((seqlen & 0x80) == 0x80) {
835                        seqlenlen = seqlen & 0x7f;  // number of length bytes
836
837                        bytesread = 0;
838                        eos = false;
839
840                        // Read all length bytes
841                        while (bytesread < seqlenlen) {
842                            br = in.read(inbuf, offset+bytesread,
843                                seqlenlen-bytesread);
844                            if (br < 0) {
845                                eos = true;
846                                break; // EOF
847                            }
848                            bytesread += br;
849                        }
850
851                        // end-of-stream reached before length bytes are read
852                        if (eos)
853                            break;  // EOF
854
855                        // Add contents of length bytes to determine length
856                        seqlen = 0;
857                        for( int i = 0; i < seqlenlen; i++) {
858                            seqlen = (seqlen << 8) + (inbuf[offset+i] & 0xff);
859                        }
860                        offset += bytesread;
861                    }
862
863                    // read in seqlen bytes
864                    byte[] left = readFully(in, seqlen);
865                    inbuf = Arrays.copyOf(inbuf, offset + left.length);
866                    System.arraycopy(left, 0, inbuf, offset, left.length);
867                    offset += left.length;
868/*
869if (dump > 0) {
870System.err.println("seqlen: " + seqlen);
871System.err.println("bufsize: " + offset);
872System.err.println("bytesleft: " + bytesleft);
873System.err.println("bytesread: " + bytesread);
874}
875*/
876
877
878                    try {
879                        retBer = new BerDecoder(inbuf, 0, offset);
880
881                        if (traceFile != null) {
882                            Ber.dumpBER(traceFile, traceTagIn, inbuf, 0, offset);
883                        }
884
885                        retBer.parseSeq(null);
886                        inMsgId = retBer.parseInt();
887                        retBer.reset(); // reset offset
888
889                        boolean needPause = false;
890
891                        if (inMsgId == 0) {
892                            // Unsolicited Notification
893                            parent.processUnsolicited(retBer);
894                        } else {
895                            LdapRequest ldr = findRequest(inMsgId);
896
897                            if (ldr != null) {
898
899                                /**
900                                 * Grab pauseLock before making reply available
901                                 * to ensure that reader goes into paused state
902                                 * before writer can attempt to unpause reader
903                                 */
904                                synchronized (pauseLock) {
905                                    needPause = ldr.addReplyBer(retBer);
906                                    if (needPause) {
907                                        /*
908                                         * Go into paused state; release
909                                         * pauseLock
910                                         */
911                                        pauseReader();
912                                    }
913
914                                    // else release pauseLock
915                                }
916                            } else {
917                                // System.err.println("Cannot find" +
918                                //              "LdapRequest for " + inMsgId);
919                            }
920                        }
921                    } catch (Ber.DecodeException e) {
922                        //System.err.println("Cannot parse Ber");
923                    }
924                } catch (IOException ie) {
925                    if (debug) {
926                        System.err.println("Connection: Inside Caught " + ie);
927                        ie.printStackTrace();
928                    }
929
930                    if (in != getInputStream()) {
931                        // A new stream to try
932                        // Go to top of loop and continue
933                    } else {
934                        if (debug) {
935                            System.err.println("Connection: rethrowing " + ie);
936                        }
937                        throw ie;  // rethrow exception
938                    }
939                }
940            }
941
942            if (debug) {
943                System.err.println("Connection: end-of-stream detected: "
944                    + in);
945            }
946        } catch (IOException ex) {
947            if (debug) {
948                System.err.println("Connection: Caught " + ex);
949            }
950            closureReason = ex;
951        } finally {
952            cleanup(null, true); // cleanup
953        }
954        if (debug) {
955            System.err.println("Connection: Thread Exiting");
956        }
957    }
958
959    private static byte[] readFully(InputStream is, int length)
960        throws IOException
961    {
962        byte[] buf = new byte[Math.min(length, 8192)];
963        int nread = 0;
964        while (nread < length) {
965            int bytesToRead;
966            if (nread >= buf.length) {  // need to allocate a larger buffer
967                bytesToRead = Math.min(length - nread, buf.length + 8192);
968                if (buf.length < nread + bytesToRead) {
969                    buf = Arrays.copyOf(buf, nread + bytesToRead);
970                }
971            } else {
972                bytesToRead = buf.length - nread;
973            }
974            int count = is.read(buf, nread, bytesToRead);
975            if (count < 0) {
976                if (buf.length != nread)
977                    buf = Arrays.copyOf(buf, nread);
978                break;
979            }
980            nread += count;
981        }
982        return buf;
983    }
984
985    // This code must be uncommented to run the LdapAbandonTest.
986    /*public void sendSearchReqs(String dn, int numReqs) {
987        int i;
988        String attrs[] = null;
989        for(i = 1; i <= numReqs; i++) {
990            BerEncoder ber = new BerEncoder(2048);
991
992            try {
993            ber.beginSeq(Ber.ASN_SEQUENCE | Ber.ASN_CONSTRUCTOR);
994                ber.encodeInt(i);
995                ber.beginSeq(LdapClient.LDAP_REQ_SEARCH);
996                    ber.encodeString(dn == null ? "" : dn);
997                    ber.encodeInt(0, LdapClient.LBER_ENUMERATED);
998                    ber.encodeInt(3, LdapClient.LBER_ENUMERATED);
999                    ber.encodeInt(0);
1000                    ber.encodeInt(0);
1001                    ber.encodeBoolean(true);
1002                    LdapClient.encodeFilter(ber, "");
1003                    ber.beginSeq(Ber.ASN_SEQUENCE | Ber.ASN_CONSTRUCTOR);
1004                        ber.encodeStringArray(attrs);
1005                    ber.endSeq();
1006                ber.endSeq();
1007            ber.endSeq();
1008            writeRequest(ber, i);
1009            //System.err.println("wrote request " + i);
1010            } catch (Exception ex) {
1011            //System.err.println("ldap.search: Caught " + ex + " building req");
1012            }
1013
1014        }
1015    } */
1016}
1017