1/*
2 * Copyright (c) 2002, 2016, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package com.sun.jmx.remote.internal;
26
27import java.io.IOException;
28import java.io.NotSerializableException;
29
30import java.util.ArrayList;
31import java.util.HashMap;
32import java.util.List;
33import java.util.Map;
34import java.util.concurrent.Executor;
35
36import java.security.AccessControlContext;
37import java.security.AccessController;
38import java.security.PrivilegedAction;
39import javax.security.auth.Subject;
40
41import javax.management.Notification;
42import javax.management.NotificationListener;
43import javax.management.NotificationFilter;
44import javax.management.ObjectName;
45import javax.management.MBeanServerNotification;
46import javax.management.InstanceNotFoundException;
47import javax.management.ListenerNotFoundException;
48
49import javax.management.remote.NotificationResult;
50import javax.management.remote.TargetedNotification;
51
52import com.sun.jmx.remote.util.ClassLogger;
53import com.sun.jmx.remote.util.EnvHelp;
54import java.lang.reflect.UndeclaredThrowableException;
55import java.util.concurrent.RejectedExecutionException;
56
57
58public abstract class ClientNotifForwarder {
59
60    private final AccessControlContext acc;
61
62    public ClientNotifForwarder(Map<String, ?> env) {
63        this(null, env);
64    }
65
66    private static int threadId;
67
68    /* An Executor that allows at most one executing and one pending
69       Runnable.  It uses at most one thread -- as soon as there is
70       no pending Runnable the thread can exit.  Another thread is
71       created as soon as there is a new pending Runnable.  This
72       Executor is adapted for use in a situation where each Runnable
73       usually schedules up another Runnable.  On return from the
74       first one, the second one is immediately executed.  So this
75       just becomes a complicated way to write a while loop, but with
76       the advantage that you can replace it with another Executor,
77       for instance one that you are using to execute a bunch of other
78       unrelated work.
79
80       You might expect that a java.util.concurrent.ThreadPoolExecutor
81       with corePoolSize=0 and maximumPoolSize=1 would have the same
82       behavior, but it does not.  A ThreadPoolExecutor only creates
83       a new thread when a new task is submitted and the number of
84       existing threads is < corePoolSize.  This can never happen when
85       corePoolSize=0, so new threads are never created.  Surprising,
86       but there you are.
87    */
88    private static class LinearExecutor implements Executor {
89        public synchronized void execute(Runnable command) {
90            if (this.command != null)
91                throw new IllegalArgumentException("More than one command");
92            this.command = command;
93            if (thread == null) {
94                thread = new Thread(
95                    null,
96                    ()-> {
97                        while (true) {
98                            Runnable r;
99                            synchronized (LinearExecutor.this) {
100                                if (LinearExecutor.this.command == null) {
101                                    thread = null;
102                                    return;
103                                } else {
104                                    r = LinearExecutor.this.command;
105                                    LinearExecutor.this.command = null;
106                                }
107                            }
108                            r.run();
109                        }
110                    },
111                    "ClientNotifForwarder-" + ++threadId,
112                    0,
113                    false
114                );
115                thread.setDaemon(true);
116                thread.start();
117            }
118        }
119
120        private Runnable command;
121        private Thread thread;
122    }
123
124    public ClientNotifForwarder(ClassLoader defaultClassLoader, Map<String, ?> env) {
125        maxNotifications = EnvHelp.getMaxFetchNotifNumber(env);
126        timeout = EnvHelp.getFetchTimeout(env);
127
128        /* You can supply an Executor in which the remote call to
129           fetchNotifications will be made.  The Executor's execute
130           method reschedules another task, so you must not use
131           an Executor that executes tasks in the caller's thread.  */
132        Executor ex = (Executor)
133            env.get("jmx.remote.x.fetch.notifications.executor");
134        if (ex == null)
135            ex = new LinearExecutor();
136        else if (logger.traceOn())
137            logger.trace("ClientNotifForwarder", "executor is " + ex);
138
139        this.defaultClassLoader = defaultClassLoader;
140        this.executor = ex;
141        this.acc = AccessController.getContext();
142    }
143
144    /**
145     * Called to fetch notifications from a server.
146     */
147    abstract protected NotificationResult fetchNotifs(long clientSequenceNumber,
148                                                      int maxNotifications,
149                                                      long timeout)
150            throws IOException, ClassNotFoundException;
151
152    abstract protected Integer addListenerForMBeanRemovedNotif()
153        throws IOException, InstanceNotFoundException;
154
155    abstract protected void removeListenerForMBeanRemovedNotif(Integer id)
156        throws IOException, InstanceNotFoundException,
157               ListenerNotFoundException;
158
159    /**
160     * Used to send out a notification about lost notifs
161     */
162    abstract protected void lostNotifs(String message, long number);
163
164
165    public synchronized void addNotificationListener(Integer listenerID,
166                                        ObjectName name,
167                                        NotificationListener listener,
168                                        NotificationFilter filter,
169                                        Object handback,
170                                        Subject delegationSubject)
171            throws IOException, InstanceNotFoundException {
172
173        if (logger.traceOn()) {
174            logger.trace("addNotificationListener",
175                         "Add the listener "+listener+" at "+name);
176        }
177
178        infoList.put(listenerID,
179                     new ClientListenerInfo(listenerID,
180                                            name,
181                                            listener,
182                                            filter,
183                                            handback,
184                                            delegationSubject));
185
186
187        init(false);
188    }
189
190    public synchronized Integer[]
191        removeNotificationListener(ObjectName name,
192                                   NotificationListener listener)
193        throws ListenerNotFoundException, IOException {
194
195        beforeRemove();
196
197        if (logger.traceOn()) {
198            logger.trace("removeNotificationListener",
199                         "Remove the listener "+listener+" from "+name);
200        }
201
202        List<Integer> ids = new ArrayList<Integer>();
203        List<ClientListenerInfo> values =
204                new ArrayList<ClientListenerInfo>(infoList.values());
205        for (int i=values.size()-1; i>=0; i--) {
206            ClientListenerInfo li = values.get(i);
207
208            if (li.sameAs(name, listener)) {
209                ids.add(li.getListenerID());
210
211                infoList.remove(li.getListenerID());
212            }
213        }
214
215        if (ids.isEmpty())
216            throw new ListenerNotFoundException("Listener not found");
217
218        return ids.toArray(new Integer[0]);
219    }
220
221    public synchronized Integer
222        removeNotificationListener(ObjectName name,
223                                   NotificationListener listener,
224                                   NotificationFilter filter,
225                                   Object handback)
226            throws ListenerNotFoundException, IOException {
227
228        if (logger.traceOn()) {
229            logger.trace("removeNotificationListener",
230                         "Remove the listener "+listener+" from "+name);
231        }
232
233        beforeRemove();
234
235        Integer id = null;
236
237        List<ClientListenerInfo> values =
238                new ArrayList<ClientListenerInfo>(infoList.values());
239        for (int i=values.size()-1; i>=0; i--) {
240            ClientListenerInfo li = values.get(i);
241            if (li.sameAs(name, listener, filter, handback)) {
242                id=li.getListenerID();
243
244                infoList.remove(id);
245
246                break;
247            }
248        }
249
250        if (id == null)
251            throw new ListenerNotFoundException("Listener not found");
252
253        return id;
254    }
255
256    public synchronized Integer[] removeNotificationListener(ObjectName name) {
257        if (logger.traceOn()) {
258            logger.trace("removeNotificationListener",
259                         "Remove all listeners registered at "+name);
260        }
261
262        List<Integer> ids = new ArrayList<Integer>();
263
264        List<ClientListenerInfo> values =
265                new ArrayList<ClientListenerInfo>(infoList.values());
266        for (int i=values.size()-1; i>=0; i--) {
267            ClientListenerInfo li = values.get(i);
268            if (li.sameAs(name)) {
269                ids.add(li.getListenerID());
270
271                infoList.remove(li.getListenerID());
272            }
273        }
274
275        return ids.toArray(new Integer[0]);
276    }
277
278    /*
279     * Called when a connector is doing reconnection. Like <code>postReconnection</code>,
280     * this method is intended to be called only by a client connector:
281     * <code>RMIConnector</code> and <code>ClientIntermediary</code>.
282     * Call this method will set the flag beingReconnection to <code>true</code>,
283     * and the thread used to fetch notifis will be stopped, a new thread can be
284     * created only after the method <code>postReconnection</code> is called.
285     *
286     * It is caller's responsiblity to not re-call this method before calling
287     * <code>postReconnection</code>.
288     */
289    public synchronized ClientListenerInfo[] preReconnection() throws IOException {
290        if (state == TERMINATED || beingReconnected) { // should never
291            throw new IOException("Illegal state.");
292        }
293
294        final ClientListenerInfo[] tmp =
295            infoList.values().toArray(new ClientListenerInfo[0]);
296
297
298        beingReconnected = true;
299
300        infoList.clear();
301
302        return tmp;
303    }
304
305    /**
306     * Called after reconnection is finished.
307     * This method is intended to be called only by a client connector:
308     * <code>RMIConnector</code> and <code>ClientIntermediary</code>.
309     */
310    public synchronized void postReconnection(ClientListenerInfo[] listenerInfos)
311        throws IOException {
312
313        if (state == TERMINATED) {
314            return;
315        }
316
317        while (state == STOPPING) {
318            try {
319                wait();
320            } catch (InterruptedException ire) {
321                IOException ioe = new IOException(ire.toString());
322                EnvHelp.initCause(ioe, ire);
323                throw ioe;
324            }
325        }
326
327        final boolean trace = logger.traceOn();
328        final int len   = listenerInfos.length;
329
330        for (int i=0; i<len; i++) {
331            if (trace) {
332                logger.trace("addNotificationListeners",
333                             "Add a listener at "+
334                             listenerInfos[i].getListenerID());
335            }
336
337            infoList.put(listenerInfos[i].getListenerID(), listenerInfos[i]);
338        }
339
340        beingReconnected = false;
341        notifyAll();
342
343        if (currentFetchThread == Thread.currentThread() ||
344              state == STARTING || state == STARTED) { // doing or waiting reconnection
345              // only update mbeanRemovedNotifID
346            try {
347                mbeanRemovedNotifID = addListenerForMBeanRemovedNotif();
348            } catch (Exception e) {
349                final String msg =
350                    "Failed to register a listener to the mbean " +
351                    "server: the client will not do clean when an MBean " +
352                    "is unregistered";
353                if (logger.traceOn()) {
354                    logger.trace("init", msg, e);
355                }
356            }
357        } else {
358              while (state == STOPPING) {
359                  try {
360                      wait();
361                  } catch (InterruptedException ire) {
362                      IOException ioe = new IOException(ire.toString());
363                      EnvHelp.initCause(ioe, ire);
364                      throw ioe;
365                  }
366              }
367
368              if (listenerInfos.length > 0) { // old listeners are re-added
369                  init(true); // not update clientSequenceNumber
370              } else if (infoList.size() > 0) { // only new listeners added during reconnection
371                  init(false); // need update clientSequenceNumber
372              }
373          }
374    }
375
376    public synchronized void terminate() {
377        if (state == TERMINATED) {
378            return;
379        }
380
381        if (logger.traceOn()) {
382            logger.trace("terminate", "Terminating...");
383        }
384
385        if (state == STARTED) {
386           infoList.clear();
387        }
388
389        setState(TERMINATED);
390    }
391
392
393    // -------------------------------------------------
394    // private classes
395    // -------------------------------------------------
396    //
397
398    private class NotifFetcher implements Runnable {
399
400        private volatile boolean alreadyLogged = false;
401
402        private void logOnce(String msg, SecurityException x) {
403            if (alreadyLogged) return;
404            // Log only once.
405            logger.config("setContextClassLoader",msg);
406            if (x != null) logger.fine("setContextClassLoader", x);
407            alreadyLogged = true;
408        }
409
410        // Set new context class loader, returns previous one.
411        private final ClassLoader setContextClassLoader(final ClassLoader loader) {
412            final AccessControlContext ctxt = ClientNotifForwarder.this.acc;
413            // if ctxt is null, log a config message and throw a
414            // SecurityException.
415            if (ctxt == null) {
416                logOnce("AccessControlContext must not be null.",null);
417                throw new SecurityException("AccessControlContext must not be null");
418            }
419            return AccessController.doPrivileged(
420                new PrivilegedAction<ClassLoader>() {
421                    public ClassLoader run() {
422                        try {
423                            // get context class loader - may throw
424                            // SecurityException - though unlikely.
425                            final ClassLoader previous =
426                                Thread.currentThread().getContextClassLoader();
427
428                            // if nothing needs to be done, break here...
429                            if (loader == previous) return previous;
430
431                            // reset context class loader - may throw
432                            // SecurityException
433                            Thread.currentThread().setContextClassLoader(loader);
434                            return previous;
435                        } catch (SecurityException x) {
436                            logOnce("Permission to set ContextClassLoader missing. " +
437                                    "Notifications will not be dispatched. " +
438                                    "Please check your Java policy configuration: " +
439                                    x, x);
440                            throw x;
441                        }
442                    }
443                }, ctxt);
444        }
445
446        public void run() {
447            final ClassLoader previous;
448            if (defaultClassLoader != null) {
449                previous = setContextClassLoader(defaultClassLoader);
450            } else {
451                previous = null;
452            }
453            try {
454                doRun();
455            } finally {
456                if (defaultClassLoader != null) {
457                    setContextClassLoader(previous);
458                }
459            }
460        }
461
462        private void doRun() {
463            synchronized (ClientNotifForwarder.this) {
464                currentFetchThread = Thread.currentThread();
465
466                if (state == STARTING) {
467                    setState(STARTED);
468                }
469            }
470
471
472            NotificationResult nr = null;
473            if (!shouldStop() && (nr = fetchNotifs()) != null) {
474                // nr == null means got exception
475
476                final TargetedNotification[] notifs =
477                    nr.getTargetedNotifications();
478                final int len = notifs.length;
479                final Map<Integer, ClientListenerInfo> listeners;
480                final Integer myListenerID;
481
482                long missed = 0;
483
484                synchronized(ClientNotifForwarder.this) {
485                    // check sequence number.
486                    //
487                    if (clientSequenceNumber >= 0) {
488                        missed = nr.getEarliestSequenceNumber() -
489                            clientSequenceNumber;
490                    }
491
492                    clientSequenceNumber = nr.getNextSequenceNumber();
493
494                    listeners = new HashMap<Integer, ClientListenerInfo>();
495
496                    for (int i = 0 ; i < len ; i++) {
497                        final TargetedNotification tn = notifs[i];
498                        final Integer listenerID = tn.getListenerID();
499
500                        // check if an mbean unregistration notif
501                        if (!listenerID.equals(mbeanRemovedNotifID)) {
502                            final ClientListenerInfo li = infoList.get(listenerID);
503                            if (li != null) {
504                                listeners.put(listenerID, li);
505                            }
506                            continue;
507                        }
508                        final Notification notif = tn.getNotification();
509                        final String unreg =
510                            MBeanServerNotification.UNREGISTRATION_NOTIFICATION;
511                        if (notif instanceof MBeanServerNotification &&
512                            notif.getType().equals(unreg)) {
513
514                            MBeanServerNotification mbsn =
515                                (MBeanServerNotification) notif;
516                            ObjectName name = mbsn.getMBeanName();
517
518                            removeNotificationListener(name);
519                        }
520                    }
521                    myListenerID = mbeanRemovedNotifID;
522                }
523
524                if (missed > 0) {
525                    final String msg =
526                        "May have lost up to " + missed +
527                        " notification" + (missed == 1 ? "" : "s");
528                    lostNotifs(msg, missed);
529                    logger.trace("NotifFetcher.run", msg);
530                }
531
532                // forward
533                for (int i = 0 ; i < len ; i++) {
534                    final TargetedNotification tn = notifs[i];
535                    dispatchNotification(tn,myListenerID,listeners);
536                }
537            }
538
539            synchronized (ClientNotifForwarder.this) {
540                currentFetchThread = null;
541            }
542
543            if (nr == null) {
544                if (logger.traceOn()) {
545                    logger.trace("NotifFetcher-run",
546                            "Recieved null object as notifs, stops fetching because the "
547                                    + "notification server is terminated.");
548                }
549            }
550            if (nr == null || shouldStop()) {
551                // tell that the thread is REALLY stopped
552                setState(STOPPED);
553
554                try {
555                      removeListenerForMBeanRemovedNotif(mbeanRemovedNotifID);
556                } catch (Exception e) {
557                    if (logger.traceOn()) {
558                        logger.trace("NotifFetcher-run",
559                                "removeListenerForMBeanRemovedNotif", e);
560                    }
561                }
562            } else {
563                try {
564                    executor.execute(this);
565                } catch (Exception e) {
566                    if (isRejectedExecutionException(e)) {
567                        // We reached here because the executor was shutdown.
568                        // If executor was supplied by client, then it was shutdown
569                        // abruptly or JMXConnector was shutdown along with executor
570                        // while this thread was suspended at L564.
571                        if (!(executor instanceof LinearExecutor)) {
572                            // Spawn new executor that will do cleanup if JMXConnector is closed
573                            // or keep notif system running otherwise
574                            executor = new LinearExecutor();
575                            executor.execute(this);
576                        }
577                    } else {
578                        throw e;
579                    }
580                }
581            }
582        }
583
584        private boolean isRejectedExecutionException(Exception e) {
585            Throwable cause = e;
586            while (cause != null) {
587                if (cause instanceof RejectedExecutionException) {
588                    return true;
589                }
590                cause = cause.getCause();
591            }
592            return false;
593        }
594
595        void dispatchNotification(TargetedNotification tn,
596                                  Integer myListenerID,
597                                  Map<Integer, ClientListenerInfo> listeners) {
598            final Notification notif = tn.getNotification();
599            final Integer listenerID = tn.getListenerID();
600
601            if (listenerID.equals(myListenerID)) return;
602            final ClientListenerInfo li = listeners.get(listenerID);
603
604            if (li == null) {
605                logger.trace("NotifFetcher.dispatch",
606                             "Listener ID not in map");
607                return;
608            }
609
610            NotificationListener l = li.getListener();
611            Object h = li.getHandback();
612            try {
613                l.handleNotification(notif, h);
614            } catch (RuntimeException e) {
615                final String msg =
616                    "Failed to forward a notification " +
617                    "to a listener";
618                logger.trace("NotifFetcher-run", msg, e);
619            }
620
621        }
622
623        private NotificationResult fetchNotifs() {
624            try {
625                NotificationResult nr = ClientNotifForwarder.this.
626                    fetchNotifs(clientSequenceNumber,maxNotifications,
627                                timeout);
628
629                if (logger.traceOn()) {
630                    logger.trace("NotifFetcher-run",
631                                 "Got notifications from the server: "+nr);
632                }
633
634                return nr;
635            } catch (ClassNotFoundException | NotSerializableException e) {
636                logger.trace("NotifFetcher.fetchNotifs", e);
637                return fetchOneNotif();
638            } catch (IOException ioe) {
639                if (!shouldStop()) {
640                    logger.error("NotifFetcher-run",
641                                 "Failed to fetch notification, " +
642                                 "stopping thread. Error is: " + ioe, ioe);
643                    logger.debug("NotifFetcher-run",ioe);
644                }
645
646                // no more fetching
647                return null;
648            }
649        }
650
651        /* Fetch one notification when we suspect that it might be a
652           notification that we can't deserialize (because of a
653           missing class).  First we ask for 0 notifications with 0
654           timeout.  This allows us to skip sequence numbers for
655           notifications that don't match our filters.  Then we ask
656           for one notification.  If that produces a
657           ClassNotFoundException, NotSerializableException or
658           UnmarshalException, we increase our sequence number and ask again.
659           Eventually we will either get a successful notification, or a
660           return with 0 notifications.  In either case we can return a
661           NotificationResult.  This algorithm works (albeit less
662           well) even if the server implementation doesn't optimize a
663           request for 0 notifications to skip sequence numbers for
664           notifications that don't match our filters.
665
666           If we had at least one
667           ClassNotFoundException/NotSerializableException/UnmarshalException,
668           then we must emit a JMXConnectionNotification.LOST_NOTIFS.
669        */
670        private NotificationResult fetchOneNotif() {
671            ClientNotifForwarder cnf = ClientNotifForwarder.this;
672
673            long startSequenceNumber = clientSequenceNumber;
674
675            int notFoundCount = 0;
676
677            NotificationResult result = null;
678            long firstEarliest = -1;
679
680            while (result == null && !shouldStop()) {
681                NotificationResult nr;
682
683                try {
684                    // 0 notifs to update startSequenceNumber
685                    nr = cnf.fetchNotifs(startSequenceNumber, 0, 0L);
686                } catch (ClassNotFoundException e) {
687                    logger.warning("NotifFetcher.fetchOneNotif",
688                                   "Impossible exception: " + e);
689                    logger.debug("NotifFetcher.fetchOneNotif",e);
690                    return null;
691                } catch (IOException e) {
692                    if (!shouldStop())
693                        logger.trace("NotifFetcher.fetchOneNotif", e);
694                    return null;
695                }
696
697                if (shouldStop() || nr == null)
698                    return null;
699
700                startSequenceNumber = nr.getNextSequenceNumber();
701                if (firstEarliest < 0)
702                    firstEarliest = nr.getEarliestSequenceNumber();
703
704                try {
705                    // 1 notif to skip possible missing class
706                    result = cnf.fetchNotifs(startSequenceNumber, 1, 0L);
707                } catch (ClassNotFoundException | NotSerializableException e) {
708                    logger.warning("NotifFetcher.fetchOneNotif",
709                                   "Failed to deserialize a notification: "+e.toString());
710                    if (logger.traceOn()) {
711                        logger.trace("NotifFetcher.fetchOneNotif",
712                                     "Failed to deserialize a notification.", e);
713                    }
714
715                    notFoundCount++;
716                    startSequenceNumber++;
717                } catch (Exception e) {
718                    if (!shouldStop())
719                        logger.trace("NotifFetcher.fetchOneNotif", e);
720                    return null;
721                }
722            }
723
724            if (notFoundCount > 0) {
725                final String msg =
726                    "Dropped " + notFoundCount + " notification" +
727                    (notFoundCount == 1 ? "" : "s") +
728                    " because classes were missing locally or incompatible";
729                lostNotifs(msg, notFoundCount);
730                // Even if result.getEarliestSequenceNumber() is now greater than
731                // it was initially, meaning some notifs have been dropped
732                // from the buffer, we don't want the caller to see that
733                // because it is then likely to renotify about the lost notifs.
734                // So we put back the first value of earliestSequenceNumber
735                // that we saw.
736                if (result != null) {
737                    result = new NotificationResult(
738                            firstEarliest, result.getNextSequenceNumber(),
739                            result.getTargetedNotifications());
740                }
741            }
742
743            return result;
744        }
745
746        private boolean shouldStop() {
747            synchronized (ClientNotifForwarder.this) {
748                if (state != STARTED) {
749                    return true;
750                } else if (infoList.size() == 0) {
751                    // no more listener, stop fetching
752                    setState(STOPPING);
753
754                    return true;
755                }
756
757                return false;
758            }
759        }
760    }
761
762
763// -------------------------------------------------
764// private methods
765// -------------------------------------------------
766    private synchronized void setState(int newState) {
767        if (state == TERMINATED) {
768            return;
769        }
770
771        state = newState;
772        this.notifyAll();
773    }
774
775    /*
776     * Called to decide whether need to start a thread for fetching notifs.
777     * <P>The parameter reconnected will decide whether to initilize the clientSequenceNumber,
778     * initilaizing the clientSequenceNumber means to ignore all notifications arrived before.
779     * If it is reconnected, we will not initialize in order to get all notifications arrived
780     * during the reconnection. It may cause the newly registered listeners to receive some
781     * notifications arrived before its registray.
782     */
783    private synchronized void init(boolean reconnected) throws IOException {
784        switch (state) {
785        case STARTED:
786            return;
787        case STARTING:
788            return;
789        case TERMINATED:
790            throw new IOException("The ClientNotifForwarder has been terminated.");
791        case STOPPING:
792            if (beingReconnected == true) {
793                // wait for another thread to do, which is doing reconnection
794                return;
795            }
796
797            while (state == STOPPING) { // make sure only one fetching thread.
798                try {
799                    wait();
800                } catch (InterruptedException ire) {
801                    IOException ioe = new IOException(ire.toString());
802                    EnvHelp.initCause(ioe, ire);
803
804                    throw ioe;
805                }
806            }
807
808            // re-call this method to check the state again,
809            // the state can be other value like TERMINATED.
810            init(reconnected);
811
812            return;
813        case STOPPED:
814            if (beingReconnected == true) {
815                // wait for another thread to do, which is doing reconnection
816                return;
817            }
818
819            if (logger.traceOn()) {
820                logger.trace("init", "Initializing...");
821            }
822
823            // init the clientSequenceNumber if not reconnected.
824            if (!reconnected) {
825                try {
826                    NotificationResult nr = fetchNotifs(-1, 0, 0);
827
828                    if (state != STOPPED) { // JDK-8038940
829                                            // reconnection must happen during
830                                            // fetchNotifs(-1, 0, 0), and a new
831                                            // thread takes over the fetching job
832                        return;
833                    }
834
835                    clientSequenceNumber = nr.getNextSequenceNumber();
836                } catch (ClassNotFoundException e) {
837                    // can't happen
838                    logger.warning("init", "Impossible exception: "+ e);
839                    logger.debug("init",e);
840                }
841            }
842
843            // for cleaning
844            try {
845                mbeanRemovedNotifID = addListenerForMBeanRemovedNotif();
846            } catch (Exception e) {
847                final String msg =
848                    "Failed to register a listener to the mbean " +
849                    "server: the client will not do clean when an MBean " +
850                    "is unregistered";
851                if (logger.traceOn()) {
852                    logger.trace("init", msg, e);
853                }
854            }
855
856            setState(STARTING);
857
858            // start fetching
859            executor.execute(new NotifFetcher());
860
861            return;
862        default:
863            // should not
864            throw new IOException("Unknown state.");
865        }
866    }
867
868    /**
869     * Import: should not remove a listener during reconnection, the reconnection
870     * needs to change the listener list and that will possibly make removal fail.
871     */
872    private synchronized void beforeRemove() throws IOException {
873        while (beingReconnected) {
874            if (state == TERMINATED) {
875                throw new IOException("Terminated.");
876            }
877
878            try {
879                wait();
880            } catch (InterruptedException ire) {
881                IOException ioe = new IOException(ire.toString());
882                EnvHelp.initCause(ioe, ire);
883
884                throw ioe;
885            }
886        }
887
888        if (state == TERMINATED) {
889            throw new IOException("Terminated.");
890        }
891    }
892
893// -------------------------------------------------
894// private variables
895// -------------------------------------------------
896
897    private final ClassLoader defaultClassLoader;
898    private Executor executor;
899
900    private final Map<Integer, ClientListenerInfo> infoList =
901            new HashMap<Integer, ClientListenerInfo>();
902
903    // notif stuff
904    private long clientSequenceNumber = -1;
905    private final int maxNotifications;
906    private final long timeout;
907    private Integer mbeanRemovedNotifID = null;
908    private Thread currentFetchThread;
909
910    // state
911    /**
912     * This state means that a thread is being created for fetching and forwarding notifications.
913     */
914    private static final int STARTING = 0;
915
916    /**
917     * This state tells that a thread has been started for fetching and forwarding notifications.
918     */
919    private static final int STARTED = 1;
920
921    /**
922     * This state means that the fetching thread is informed to stop.
923     */
924    private static final int STOPPING = 2;
925
926    /**
927     * This state means that the fetching thread is already stopped.
928     */
929    private static final int STOPPED = 3;
930
931    /**
932     * This state means that this object is terminated and no more thread will be created
933     * for fetching notifications.
934     */
935    private static final int TERMINATED = 4;
936
937    private int state = STOPPED;
938
939    /**
940     * This variable is used to tell whether a connector (RMIConnector or ClientIntermediary)
941     * is doing reconnection.
942     * This variable will be set to true by the method <code>preReconnection</code>, and set
943     * to false by <code>postReconnection</code>.
944     * When beingReconnected == true, no thread will be created for fetching notifications.
945     */
946    private boolean beingReconnected = false;
947
948    private static final ClassLogger logger =
949        new ClassLogger("javax.management.remote.misc",
950                        "ClientNotifForwarder");
951}
952