1/* 2 * Copyright (c) 2002, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25package com.sun.jmx.remote.internal; 26 27import java.io.IOException; 28import java.io.NotSerializableException; 29 30import java.util.ArrayList; 31import java.util.HashMap; 32import java.util.List; 33import java.util.Map; 34import java.util.concurrent.Executor; 35 36import java.security.AccessControlContext; 37import java.security.AccessController; 38import java.security.PrivilegedAction; 39import javax.security.auth.Subject; 40 41import javax.management.Notification; 42import javax.management.NotificationListener; 43import javax.management.NotificationFilter; 44import javax.management.ObjectName; 45import javax.management.MBeanServerNotification; 46import javax.management.InstanceNotFoundException; 47import javax.management.ListenerNotFoundException; 48 49import javax.management.remote.NotificationResult; 50import javax.management.remote.TargetedNotification; 51 52import com.sun.jmx.remote.util.ClassLogger; 53import com.sun.jmx.remote.util.EnvHelp; 54import java.lang.reflect.UndeclaredThrowableException; 55import java.util.concurrent.RejectedExecutionException; 56 57 58public abstract class ClientNotifForwarder { 59 60 private final AccessControlContext acc; 61 62 public ClientNotifForwarder(Map<String, ?> env) { 63 this(null, env); 64 } 65 66 private static int threadId; 67 68 /* An Executor that allows at most one executing and one pending 69 Runnable. It uses at most one thread -- as soon as there is 70 no pending Runnable the thread can exit. Another thread is 71 created as soon as there is a new pending Runnable. This 72 Executor is adapted for use in a situation where each Runnable 73 usually schedules up another Runnable. On return from the 74 first one, the second one is immediately executed. So this 75 just becomes a complicated way to write a while loop, but with 76 the advantage that you can replace it with another Executor, 77 for instance one that you are using to execute a bunch of other 78 unrelated work. 79 80 You might expect that a java.util.concurrent.ThreadPoolExecutor 81 with corePoolSize=0 and maximumPoolSize=1 would have the same 82 behavior, but it does not. A ThreadPoolExecutor only creates 83 a new thread when a new task is submitted and the number of 84 existing threads is < corePoolSize. This can never happen when 85 corePoolSize=0, so new threads are never created. Surprising, 86 but there you are. 87 */ 88 private static class LinearExecutor implements Executor { 89 public synchronized void execute(Runnable command) { 90 if (this.command != null) 91 throw new IllegalArgumentException("More than one command"); 92 this.command = command; 93 if (thread == null) { 94 thread = new Thread( 95 null, 96 ()-> { 97 while (true) { 98 Runnable r; 99 synchronized (LinearExecutor.this) { 100 if (LinearExecutor.this.command == null) { 101 thread = null; 102 return; 103 } else { 104 r = LinearExecutor.this.command; 105 LinearExecutor.this.command = null; 106 } 107 } 108 r.run(); 109 } 110 }, 111 "ClientNotifForwarder-" + ++threadId, 112 0, 113 false 114 ); 115 thread.setDaemon(true); 116 thread.start(); 117 } 118 } 119 120 private Runnable command; 121 private Thread thread; 122 } 123 124 public ClientNotifForwarder(ClassLoader defaultClassLoader, Map<String, ?> env) { 125 maxNotifications = EnvHelp.getMaxFetchNotifNumber(env); 126 timeout = EnvHelp.getFetchTimeout(env); 127 128 /* You can supply an Executor in which the remote call to 129 fetchNotifications will be made. The Executor's execute 130 method reschedules another task, so you must not use 131 an Executor that executes tasks in the caller's thread. */ 132 Executor ex = (Executor) 133 env.get("jmx.remote.x.fetch.notifications.executor"); 134 if (ex == null) 135 ex = new LinearExecutor(); 136 else if (logger.traceOn()) 137 logger.trace("ClientNotifForwarder", "executor is " + ex); 138 139 this.defaultClassLoader = defaultClassLoader; 140 this.executor = ex; 141 this.acc = AccessController.getContext(); 142 } 143 144 /** 145 * Called to fetch notifications from a server. 146 */ 147 abstract protected NotificationResult fetchNotifs(long clientSequenceNumber, 148 int maxNotifications, 149 long timeout) 150 throws IOException, ClassNotFoundException; 151 152 abstract protected Integer addListenerForMBeanRemovedNotif() 153 throws IOException, InstanceNotFoundException; 154 155 abstract protected void removeListenerForMBeanRemovedNotif(Integer id) 156 throws IOException, InstanceNotFoundException, 157 ListenerNotFoundException; 158 159 /** 160 * Used to send out a notification about lost notifs 161 */ 162 abstract protected void lostNotifs(String message, long number); 163 164 165 public synchronized void addNotificationListener(Integer listenerID, 166 ObjectName name, 167 NotificationListener listener, 168 NotificationFilter filter, 169 Object handback, 170 Subject delegationSubject) 171 throws IOException, InstanceNotFoundException { 172 173 if (logger.traceOn()) { 174 logger.trace("addNotificationListener", 175 "Add the listener "+listener+" at "+name); 176 } 177 178 infoList.put(listenerID, 179 new ClientListenerInfo(listenerID, 180 name, 181 listener, 182 filter, 183 handback, 184 delegationSubject)); 185 186 187 init(false); 188 } 189 190 public synchronized Integer[] 191 removeNotificationListener(ObjectName name, 192 NotificationListener listener) 193 throws ListenerNotFoundException, IOException { 194 195 beforeRemove(); 196 197 if (logger.traceOn()) { 198 logger.trace("removeNotificationListener", 199 "Remove the listener "+listener+" from "+name); 200 } 201 202 List<Integer> ids = new ArrayList<Integer>(); 203 List<ClientListenerInfo> values = 204 new ArrayList<ClientListenerInfo>(infoList.values()); 205 for (int i=values.size()-1; i>=0; i--) { 206 ClientListenerInfo li = values.get(i); 207 208 if (li.sameAs(name, listener)) { 209 ids.add(li.getListenerID()); 210 211 infoList.remove(li.getListenerID()); 212 } 213 } 214 215 if (ids.isEmpty()) 216 throw new ListenerNotFoundException("Listener not found"); 217 218 return ids.toArray(new Integer[0]); 219 } 220 221 public synchronized Integer 222 removeNotificationListener(ObjectName name, 223 NotificationListener listener, 224 NotificationFilter filter, 225 Object handback) 226 throws ListenerNotFoundException, IOException { 227 228 if (logger.traceOn()) { 229 logger.trace("removeNotificationListener", 230 "Remove the listener "+listener+" from "+name); 231 } 232 233 beforeRemove(); 234 235 Integer id = null; 236 237 List<ClientListenerInfo> values = 238 new ArrayList<ClientListenerInfo>(infoList.values()); 239 for (int i=values.size()-1; i>=0; i--) { 240 ClientListenerInfo li = values.get(i); 241 if (li.sameAs(name, listener, filter, handback)) { 242 id=li.getListenerID(); 243 244 infoList.remove(id); 245 246 break; 247 } 248 } 249 250 if (id == null) 251 throw new ListenerNotFoundException("Listener not found"); 252 253 return id; 254 } 255 256 public synchronized Integer[] removeNotificationListener(ObjectName name) { 257 if (logger.traceOn()) { 258 logger.trace("removeNotificationListener", 259 "Remove all listeners registered at "+name); 260 } 261 262 List<Integer> ids = new ArrayList<Integer>(); 263 264 List<ClientListenerInfo> values = 265 new ArrayList<ClientListenerInfo>(infoList.values()); 266 for (int i=values.size()-1; i>=0; i--) { 267 ClientListenerInfo li = values.get(i); 268 if (li.sameAs(name)) { 269 ids.add(li.getListenerID()); 270 271 infoList.remove(li.getListenerID()); 272 } 273 } 274 275 return ids.toArray(new Integer[0]); 276 } 277 278 /* 279 * Called when a connector is doing reconnection. Like <code>postReconnection</code>, 280 * this method is intended to be called only by a client connector: 281 * <code>RMIConnector</code> and <code>ClientIntermediary</code>. 282 * Call this method will set the flag beingReconnection to <code>true</code>, 283 * and the thread used to fetch notifis will be stopped, a new thread can be 284 * created only after the method <code>postReconnection</code> is called. 285 * 286 * It is caller's responsiblity to not re-call this method before calling 287 * <code>postReconnection</code>. 288 */ 289 public synchronized ClientListenerInfo[] preReconnection() throws IOException { 290 if (state == TERMINATED || beingReconnected) { // should never 291 throw new IOException("Illegal state."); 292 } 293 294 final ClientListenerInfo[] tmp = 295 infoList.values().toArray(new ClientListenerInfo[0]); 296 297 298 beingReconnected = true; 299 300 infoList.clear(); 301 302 return tmp; 303 } 304 305 /** 306 * Called after reconnection is finished. 307 * This method is intended to be called only by a client connector: 308 * <code>RMIConnector</code> and <code>ClientIntermediary</code>. 309 */ 310 public synchronized void postReconnection(ClientListenerInfo[] listenerInfos) 311 throws IOException { 312 313 if (state == TERMINATED) { 314 return; 315 } 316 317 while (state == STOPPING) { 318 try { 319 wait(); 320 } catch (InterruptedException ire) { 321 IOException ioe = new IOException(ire.toString()); 322 EnvHelp.initCause(ioe, ire); 323 throw ioe; 324 } 325 } 326 327 final boolean trace = logger.traceOn(); 328 final int len = listenerInfos.length; 329 330 for (int i=0; i<len; i++) { 331 if (trace) { 332 logger.trace("addNotificationListeners", 333 "Add a listener at "+ 334 listenerInfos[i].getListenerID()); 335 } 336 337 infoList.put(listenerInfos[i].getListenerID(), listenerInfos[i]); 338 } 339 340 beingReconnected = false; 341 notifyAll(); 342 343 if (currentFetchThread == Thread.currentThread() || 344 state == STARTING || state == STARTED) { // doing or waiting reconnection 345 // only update mbeanRemovedNotifID 346 try { 347 mbeanRemovedNotifID = addListenerForMBeanRemovedNotif(); 348 } catch (Exception e) { 349 final String msg = 350 "Failed to register a listener to the mbean " + 351 "server: the client will not do clean when an MBean " + 352 "is unregistered"; 353 if (logger.traceOn()) { 354 logger.trace("init", msg, e); 355 } 356 } 357 } else { 358 while (state == STOPPING) { 359 try { 360 wait(); 361 } catch (InterruptedException ire) { 362 IOException ioe = new IOException(ire.toString()); 363 EnvHelp.initCause(ioe, ire); 364 throw ioe; 365 } 366 } 367 368 if (listenerInfos.length > 0) { // old listeners are re-added 369 init(true); // not update clientSequenceNumber 370 } else if (infoList.size() > 0) { // only new listeners added during reconnection 371 init(false); // need update clientSequenceNumber 372 } 373 } 374 } 375 376 public synchronized void terminate() { 377 if (state == TERMINATED) { 378 return; 379 } 380 381 if (logger.traceOn()) { 382 logger.trace("terminate", "Terminating..."); 383 } 384 385 if (state == STARTED) { 386 infoList.clear(); 387 } 388 389 setState(TERMINATED); 390 } 391 392 393 // ------------------------------------------------- 394 // private classes 395 // ------------------------------------------------- 396 // 397 398 private class NotifFetcher implements Runnable { 399 400 private volatile boolean alreadyLogged = false; 401 402 private void logOnce(String msg, SecurityException x) { 403 if (alreadyLogged) return; 404 // Log only once. 405 logger.config("setContextClassLoader",msg); 406 if (x != null) logger.fine("setContextClassLoader", x); 407 alreadyLogged = true; 408 } 409 410 // Set new context class loader, returns previous one. 411 private final ClassLoader setContextClassLoader(final ClassLoader loader) { 412 final AccessControlContext ctxt = ClientNotifForwarder.this.acc; 413 // if ctxt is null, log a config message and throw a 414 // SecurityException. 415 if (ctxt == null) { 416 logOnce("AccessControlContext must not be null.",null); 417 throw new SecurityException("AccessControlContext must not be null"); 418 } 419 return AccessController.doPrivileged( 420 new PrivilegedAction<ClassLoader>() { 421 public ClassLoader run() { 422 try { 423 // get context class loader - may throw 424 // SecurityException - though unlikely. 425 final ClassLoader previous = 426 Thread.currentThread().getContextClassLoader(); 427 428 // if nothing needs to be done, break here... 429 if (loader == previous) return previous; 430 431 // reset context class loader - may throw 432 // SecurityException 433 Thread.currentThread().setContextClassLoader(loader); 434 return previous; 435 } catch (SecurityException x) { 436 logOnce("Permission to set ContextClassLoader missing. " + 437 "Notifications will not be dispatched. " + 438 "Please check your Java policy configuration: " + 439 x, x); 440 throw x; 441 } 442 } 443 }, ctxt); 444 } 445 446 public void run() { 447 final ClassLoader previous; 448 if (defaultClassLoader != null) { 449 previous = setContextClassLoader(defaultClassLoader); 450 } else { 451 previous = null; 452 } 453 try { 454 doRun(); 455 } finally { 456 if (defaultClassLoader != null) { 457 setContextClassLoader(previous); 458 } 459 } 460 } 461 462 private void doRun() { 463 synchronized (ClientNotifForwarder.this) { 464 currentFetchThread = Thread.currentThread(); 465 466 if (state == STARTING) { 467 setState(STARTED); 468 } 469 } 470 471 472 NotificationResult nr = null; 473 if (!shouldStop() && (nr = fetchNotifs()) != null) { 474 // nr == null means got exception 475 476 final TargetedNotification[] notifs = 477 nr.getTargetedNotifications(); 478 final int len = notifs.length; 479 final Map<Integer, ClientListenerInfo> listeners; 480 final Integer myListenerID; 481 482 long missed = 0; 483 484 synchronized(ClientNotifForwarder.this) { 485 // check sequence number. 486 // 487 if (clientSequenceNumber >= 0) { 488 missed = nr.getEarliestSequenceNumber() - 489 clientSequenceNumber; 490 } 491 492 clientSequenceNumber = nr.getNextSequenceNumber(); 493 494 listeners = new HashMap<Integer, ClientListenerInfo>(); 495 496 for (int i = 0 ; i < len ; i++) { 497 final TargetedNotification tn = notifs[i]; 498 final Integer listenerID = tn.getListenerID(); 499 500 // check if an mbean unregistration notif 501 if (!listenerID.equals(mbeanRemovedNotifID)) { 502 final ClientListenerInfo li = infoList.get(listenerID); 503 if (li != null) { 504 listeners.put(listenerID, li); 505 } 506 continue; 507 } 508 final Notification notif = tn.getNotification(); 509 final String unreg = 510 MBeanServerNotification.UNREGISTRATION_NOTIFICATION; 511 if (notif instanceof MBeanServerNotification && 512 notif.getType().equals(unreg)) { 513 514 MBeanServerNotification mbsn = 515 (MBeanServerNotification) notif; 516 ObjectName name = mbsn.getMBeanName(); 517 518 removeNotificationListener(name); 519 } 520 } 521 myListenerID = mbeanRemovedNotifID; 522 } 523 524 if (missed > 0) { 525 final String msg = 526 "May have lost up to " + missed + 527 " notification" + (missed == 1 ? "" : "s"); 528 lostNotifs(msg, missed); 529 logger.trace("NotifFetcher.run", msg); 530 } 531 532 // forward 533 for (int i = 0 ; i < len ; i++) { 534 final TargetedNotification tn = notifs[i]; 535 dispatchNotification(tn,myListenerID,listeners); 536 } 537 } 538 539 synchronized (ClientNotifForwarder.this) { 540 currentFetchThread = null; 541 } 542 543 if (nr == null) { 544 if (logger.traceOn()) { 545 logger.trace("NotifFetcher-run", 546 "Recieved null object as notifs, stops fetching because the " 547 + "notification server is terminated."); 548 } 549 } 550 if (nr == null || shouldStop()) { 551 // tell that the thread is REALLY stopped 552 setState(STOPPED); 553 554 try { 555 removeListenerForMBeanRemovedNotif(mbeanRemovedNotifID); 556 } catch (Exception e) { 557 if (logger.traceOn()) { 558 logger.trace("NotifFetcher-run", 559 "removeListenerForMBeanRemovedNotif", e); 560 } 561 } 562 } else { 563 try { 564 executor.execute(this); 565 } catch (Exception e) { 566 if (isRejectedExecutionException(e)) { 567 // We reached here because the executor was shutdown. 568 // If executor was supplied by client, then it was shutdown 569 // abruptly or JMXConnector was shutdown along with executor 570 // while this thread was suspended at L564. 571 if (!(executor instanceof LinearExecutor)) { 572 // Spawn new executor that will do cleanup if JMXConnector is closed 573 // or keep notif system running otherwise 574 executor = new LinearExecutor(); 575 executor.execute(this); 576 } 577 } else { 578 throw e; 579 } 580 } 581 } 582 } 583 584 private boolean isRejectedExecutionException(Exception e) { 585 Throwable cause = e; 586 while (cause != null) { 587 if (cause instanceof RejectedExecutionException) { 588 return true; 589 } 590 cause = cause.getCause(); 591 } 592 return false; 593 } 594 595 void dispatchNotification(TargetedNotification tn, 596 Integer myListenerID, 597 Map<Integer, ClientListenerInfo> listeners) { 598 final Notification notif = tn.getNotification(); 599 final Integer listenerID = tn.getListenerID(); 600 601 if (listenerID.equals(myListenerID)) return; 602 final ClientListenerInfo li = listeners.get(listenerID); 603 604 if (li == null) { 605 logger.trace("NotifFetcher.dispatch", 606 "Listener ID not in map"); 607 return; 608 } 609 610 NotificationListener l = li.getListener(); 611 Object h = li.getHandback(); 612 try { 613 l.handleNotification(notif, h); 614 } catch (RuntimeException e) { 615 final String msg = 616 "Failed to forward a notification " + 617 "to a listener"; 618 logger.trace("NotifFetcher-run", msg, e); 619 } 620 621 } 622 623 private NotificationResult fetchNotifs() { 624 try { 625 NotificationResult nr = ClientNotifForwarder.this. 626 fetchNotifs(clientSequenceNumber,maxNotifications, 627 timeout); 628 629 if (logger.traceOn()) { 630 logger.trace("NotifFetcher-run", 631 "Got notifications from the server: "+nr); 632 } 633 634 return nr; 635 } catch (ClassNotFoundException | NotSerializableException e) { 636 logger.trace("NotifFetcher.fetchNotifs", e); 637 return fetchOneNotif(); 638 } catch (IOException ioe) { 639 if (!shouldStop()) { 640 logger.error("NotifFetcher-run", 641 "Failed to fetch notification, " + 642 "stopping thread. Error is: " + ioe, ioe); 643 logger.debug("NotifFetcher-run",ioe); 644 } 645 646 // no more fetching 647 return null; 648 } 649 } 650 651 /* Fetch one notification when we suspect that it might be a 652 notification that we can't deserialize (because of a 653 missing class). First we ask for 0 notifications with 0 654 timeout. This allows us to skip sequence numbers for 655 notifications that don't match our filters. Then we ask 656 for one notification. If that produces a 657 ClassNotFoundException, NotSerializableException or 658 UnmarshalException, we increase our sequence number and ask again. 659 Eventually we will either get a successful notification, or a 660 return with 0 notifications. In either case we can return a 661 NotificationResult. This algorithm works (albeit less 662 well) even if the server implementation doesn't optimize a 663 request for 0 notifications to skip sequence numbers for 664 notifications that don't match our filters. 665 666 If we had at least one 667 ClassNotFoundException/NotSerializableException/UnmarshalException, 668 then we must emit a JMXConnectionNotification.LOST_NOTIFS. 669 */ 670 private NotificationResult fetchOneNotif() { 671 ClientNotifForwarder cnf = ClientNotifForwarder.this; 672 673 long startSequenceNumber = clientSequenceNumber; 674 675 int notFoundCount = 0; 676 677 NotificationResult result = null; 678 long firstEarliest = -1; 679 680 while (result == null && !shouldStop()) { 681 NotificationResult nr; 682 683 try { 684 // 0 notifs to update startSequenceNumber 685 nr = cnf.fetchNotifs(startSequenceNumber, 0, 0L); 686 } catch (ClassNotFoundException e) { 687 logger.warning("NotifFetcher.fetchOneNotif", 688 "Impossible exception: " + e); 689 logger.debug("NotifFetcher.fetchOneNotif",e); 690 return null; 691 } catch (IOException e) { 692 if (!shouldStop()) 693 logger.trace("NotifFetcher.fetchOneNotif", e); 694 return null; 695 } 696 697 if (shouldStop() || nr == null) 698 return null; 699 700 startSequenceNumber = nr.getNextSequenceNumber(); 701 if (firstEarliest < 0) 702 firstEarliest = nr.getEarliestSequenceNumber(); 703 704 try { 705 // 1 notif to skip possible missing class 706 result = cnf.fetchNotifs(startSequenceNumber, 1, 0L); 707 } catch (ClassNotFoundException | NotSerializableException e) { 708 logger.warning("NotifFetcher.fetchOneNotif", 709 "Failed to deserialize a notification: "+e.toString()); 710 if (logger.traceOn()) { 711 logger.trace("NotifFetcher.fetchOneNotif", 712 "Failed to deserialize a notification.", e); 713 } 714 715 notFoundCount++; 716 startSequenceNumber++; 717 } catch (Exception e) { 718 if (!shouldStop()) 719 logger.trace("NotifFetcher.fetchOneNotif", e); 720 return null; 721 } 722 } 723 724 if (notFoundCount > 0) { 725 final String msg = 726 "Dropped " + notFoundCount + " notification" + 727 (notFoundCount == 1 ? "" : "s") + 728 " because classes were missing locally or incompatible"; 729 lostNotifs(msg, notFoundCount); 730 // Even if result.getEarliestSequenceNumber() is now greater than 731 // it was initially, meaning some notifs have been dropped 732 // from the buffer, we don't want the caller to see that 733 // because it is then likely to renotify about the lost notifs. 734 // So we put back the first value of earliestSequenceNumber 735 // that we saw. 736 if (result != null) { 737 result = new NotificationResult( 738 firstEarliest, result.getNextSequenceNumber(), 739 result.getTargetedNotifications()); 740 } 741 } 742 743 return result; 744 } 745 746 private boolean shouldStop() { 747 synchronized (ClientNotifForwarder.this) { 748 if (state != STARTED) { 749 return true; 750 } else if (infoList.size() == 0) { 751 // no more listener, stop fetching 752 setState(STOPPING); 753 754 return true; 755 } 756 757 return false; 758 } 759 } 760 } 761 762 763// ------------------------------------------------- 764// private methods 765// ------------------------------------------------- 766 private synchronized void setState(int newState) { 767 if (state == TERMINATED) { 768 return; 769 } 770 771 state = newState; 772 this.notifyAll(); 773 } 774 775 /* 776 * Called to decide whether need to start a thread for fetching notifs. 777 * <P>The parameter reconnected will decide whether to initilize the clientSequenceNumber, 778 * initilaizing the clientSequenceNumber means to ignore all notifications arrived before. 779 * If it is reconnected, we will not initialize in order to get all notifications arrived 780 * during the reconnection. It may cause the newly registered listeners to receive some 781 * notifications arrived before its registray. 782 */ 783 private synchronized void init(boolean reconnected) throws IOException { 784 switch (state) { 785 case STARTED: 786 return; 787 case STARTING: 788 return; 789 case TERMINATED: 790 throw new IOException("The ClientNotifForwarder has been terminated."); 791 case STOPPING: 792 if (beingReconnected == true) { 793 // wait for another thread to do, which is doing reconnection 794 return; 795 } 796 797 while (state == STOPPING) { // make sure only one fetching thread. 798 try { 799 wait(); 800 } catch (InterruptedException ire) { 801 IOException ioe = new IOException(ire.toString()); 802 EnvHelp.initCause(ioe, ire); 803 804 throw ioe; 805 } 806 } 807 808 // re-call this method to check the state again, 809 // the state can be other value like TERMINATED. 810 init(reconnected); 811 812 return; 813 case STOPPED: 814 if (beingReconnected == true) { 815 // wait for another thread to do, which is doing reconnection 816 return; 817 } 818 819 if (logger.traceOn()) { 820 logger.trace("init", "Initializing..."); 821 } 822 823 // init the clientSequenceNumber if not reconnected. 824 if (!reconnected) { 825 try { 826 NotificationResult nr = fetchNotifs(-1, 0, 0); 827 828 if (state != STOPPED) { // JDK-8038940 829 // reconnection must happen during 830 // fetchNotifs(-1, 0, 0), and a new 831 // thread takes over the fetching job 832 return; 833 } 834 835 clientSequenceNumber = nr.getNextSequenceNumber(); 836 } catch (ClassNotFoundException e) { 837 // can't happen 838 logger.warning("init", "Impossible exception: "+ e); 839 logger.debug("init",e); 840 } 841 } 842 843 // for cleaning 844 try { 845 mbeanRemovedNotifID = addListenerForMBeanRemovedNotif(); 846 } catch (Exception e) { 847 final String msg = 848 "Failed to register a listener to the mbean " + 849 "server: the client will not do clean when an MBean " + 850 "is unregistered"; 851 if (logger.traceOn()) { 852 logger.trace("init", msg, e); 853 } 854 } 855 856 setState(STARTING); 857 858 // start fetching 859 executor.execute(new NotifFetcher()); 860 861 return; 862 default: 863 // should not 864 throw new IOException("Unknown state."); 865 } 866 } 867 868 /** 869 * Import: should not remove a listener during reconnection, the reconnection 870 * needs to change the listener list and that will possibly make removal fail. 871 */ 872 private synchronized void beforeRemove() throws IOException { 873 while (beingReconnected) { 874 if (state == TERMINATED) { 875 throw new IOException("Terminated."); 876 } 877 878 try { 879 wait(); 880 } catch (InterruptedException ire) { 881 IOException ioe = new IOException(ire.toString()); 882 EnvHelp.initCause(ioe, ire); 883 884 throw ioe; 885 } 886 } 887 888 if (state == TERMINATED) { 889 throw new IOException("Terminated."); 890 } 891 } 892 893// ------------------------------------------------- 894// private variables 895// ------------------------------------------------- 896 897 private final ClassLoader defaultClassLoader; 898 private Executor executor; 899 900 private final Map<Integer, ClientListenerInfo> infoList = 901 new HashMap<Integer, ClientListenerInfo>(); 902 903 // notif stuff 904 private long clientSequenceNumber = -1; 905 private final int maxNotifications; 906 private final long timeout; 907 private Integer mbeanRemovedNotifID = null; 908 private Thread currentFetchThread; 909 910 // state 911 /** 912 * This state means that a thread is being created for fetching and forwarding notifications. 913 */ 914 private static final int STARTING = 0; 915 916 /** 917 * This state tells that a thread has been started for fetching and forwarding notifications. 918 */ 919 private static final int STARTED = 1; 920 921 /** 922 * This state means that the fetching thread is informed to stop. 923 */ 924 private static final int STOPPING = 2; 925 926 /** 927 * This state means that the fetching thread is already stopped. 928 */ 929 private static final int STOPPED = 3; 930 931 /** 932 * This state means that this object is terminated and no more thread will be created 933 * for fetching notifications. 934 */ 935 private static final int TERMINATED = 4; 936 937 private int state = STOPPED; 938 939 /** 940 * This variable is used to tell whether a connector (RMIConnector or ClientIntermediary) 941 * is doing reconnection. 942 * This variable will be set to true by the method <code>preReconnection</code>, and set 943 * to false by <code>postReconnection</code>. 944 * When beingReconnected == true, no thread will be created for fetching notifications. 945 */ 946 private boolean beingReconnected = false; 947 948 private static final ClassLogger logger = 949 new ClassLogger("javax.management.remote.misc", 950 "ClientNotifForwarder"); 951} 952