1/* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25/* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36package java.util.concurrent.locks; 37 38import java.lang.invoke.MethodHandles; 39import java.lang.invoke.VarHandle; 40import java.util.ArrayList; 41import java.util.Collection; 42import java.util.Date; 43import java.util.concurrent.TimeUnit; 44import java.util.concurrent.locks.AbstractQueuedSynchronizer.Node; 45 46/** 47 * A version of {@link AbstractQueuedSynchronizer} in 48 * which synchronization state is maintained as a {@code long}. 49 * This class has exactly the same structure, properties, and methods 50 * as {@code AbstractQueuedSynchronizer} with the exception 51 * that all state-related parameters and results are defined 52 * as {@code long} rather than {@code int}. This class 53 * may be useful when creating synchronizers such as 54 * multilevel locks and barriers that require 55 * 64 bits of state. 56 * 57 * <p>See {@link AbstractQueuedSynchronizer} for usage 58 * notes and examples. 59 * 60 * @since 1.6 61 * @author Doug Lea 62 */ 63public abstract class AbstractQueuedLongSynchronizer 64 extends AbstractOwnableSynchronizer 65 implements java.io.Serializable { 66 67 private static final long serialVersionUID = 7373984972572414692L; 68 69 /* 70 To keep sources in sync, the remainder of this source file is 71 exactly cloned from AbstractQueuedSynchronizer, replacing class 72 name and changing ints related with sync state to longs. Please 73 keep it that way. 74 */ 75 76 /** 77 * Creates a new {@code AbstractQueuedLongSynchronizer} instance 78 * with initial synchronization state of zero. 79 */ 80 protected AbstractQueuedLongSynchronizer() { } 81 82 /** 83 * Head of the wait queue, lazily initialized. Except for 84 * initialization, it is modified only via method setHead. Note: 85 * If head exists, its waitStatus is guaranteed not to be 86 * CANCELLED. 87 */ 88 private transient volatile Node head; 89 90 /** 91 * Tail of the wait queue, lazily initialized. Modified only via 92 * method enq to add new wait node. 93 */ 94 private transient volatile Node tail; 95 96 /** 97 * The synchronization state. 98 */ 99 private volatile long state; 100 101 /** 102 * Returns the current value of synchronization state. 103 * This operation has memory semantics of a {@code volatile} read. 104 * @return current state value 105 */ 106 protected final long getState() { 107 return state; 108 } 109 110 /** 111 * Sets the value of synchronization state. 112 * This operation has memory semantics of a {@code volatile} write. 113 * @param newState the new state value 114 */ 115 protected final void setState(long newState) { 116 // See JDK-8180620: Clarify VarHandle mixed-access subtleties 117 STATE.setVolatile(this, newState); 118 } 119 120 /** 121 * Atomically sets synchronization state to the given updated 122 * value if the current state value equals the expected value. 123 * This operation has memory semantics of a {@code volatile} read 124 * and write. 125 * 126 * @param expect the expected value 127 * @param update the new value 128 * @return {@code true} if successful. False return indicates that the actual 129 * value was not equal to the expected value. 130 */ 131 protected final boolean compareAndSetState(long expect, long update) { 132 return STATE.compareAndSet(this, expect, update); 133 } 134 135 // Queuing utilities 136 137 /** 138 * The number of nanoseconds for which it is faster to spin 139 * rather than to use timed park. A rough estimate suffices 140 * to improve responsiveness with very short timeouts. 141 */ 142 static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L; 143 144 /** 145 * Inserts node into queue, initializing if necessary. See picture above. 146 * @param node the node to insert 147 * @return node's predecessor 148 */ 149 private Node enq(Node node) { 150 for (;;) { 151 Node oldTail = tail; 152 if (oldTail != null) { 153 node.setPrevRelaxed(oldTail); 154 if (compareAndSetTail(oldTail, node)) { 155 oldTail.next = node; 156 return oldTail; 157 } 158 } else { 159 initializeSyncQueue(); 160 } 161 } 162 } 163 164 /** 165 * Creates and enqueues node for current thread and given mode. 166 * 167 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 168 * @return the new node 169 */ 170 private Node addWaiter(Node mode) { 171 Node node = new Node(mode); 172 173 for (;;) { 174 Node oldTail = tail; 175 if (oldTail != null) { 176 node.setPrevRelaxed(oldTail); 177 if (compareAndSetTail(oldTail, node)) { 178 oldTail.next = node; 179 return node; 180 } 181 } else { 182 initializeSyncQueue(); 183 } 184 } 185 } 186 187 /** 188 * Sets head of queue to be node, thus dequeuing. Called only by 189 * acquire methods. Also nulls out unused fields for sake of GC 190 * and to suppress unnecessary signals and traversals. 191 * 192 * @param node the node 193 */ 194 private void setHead(Node node) { 195 head = node; 196 node.thread = null; 197 node.prev = null; 198 } 199 200 /** 201 * Wakes up node's successor, if one exists. 202 * 203 * @param node the node 204 */ 205 private void unparkSuccessor(Node node) { 206 /* 207 * If status is negative (i.e., possibly needing signal) try 208 * to clear in anticipation of signalling. It is OK if this 209 * fails or if status is changed by waiting thread. 210 */ 211 int ws = node.waitStatus; 212 if (ws < 0) 213 node.compareAndSetWaitStatus(ws, 0); 214 215 /* 216 * Thread to unpark is held in successor, which is normally 217 * just the next node. But if cancelled or apparently null, 218 * traverse backwards from tail to find the actual 219 * non-cancelled successor. 220 */ 221 Node s = node.next; 222 if (s == null || s.waitStatus > 0) { 223 s = null; 224 for (Node p = tail; p != node && p != null; p = p.prev) 225 if (p.waitStatus <= 0) 226 s = p; 227 } 228 if (s != null) 229 LockSupport.unpark(s.thread); 230 } 231 232 /** 233 * Release action for shared mode -- signals successor and ensures 234 * propagation. (Note: For exclusive mode, release just amounts 235 * to calling unparkSuccessor of head if it needs signal.) 236 */ 237 private void doReleaseShared() { 238 /* 239 * Ensure that a release propagates, even if there are other 240 * in-progress acquires/releases. This proceeds in the usual 241 * way of trying to unparkSuccessor of head if it needs 242 * signal. But if it does not, status is set to PROPAGATE to 243 * ensure that upon release, propagation continues. 244 * Additionally, we must loop in case a new node is added 245 * while we are doing this. Also, unlike other uses of 246 * unparkSuccessor, we need to know if CAS to reset status 247 * fails, if so rechecking. 248 */ 249 for (;;) { 250 Node h = head; 251 if (h != null && h != tail) { 252 int ws = h.waitStatus; 253 if (ws == Node.SIGNAL) { 254 if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) 255 continue; // loop to recheck cases 256 unparkSuccessor(h); 257 } 258 else if (ws == 0 && 259 !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) 260 continue; // loop on failed CAS 261 } 262 if (h == head) // loop if head changed 263 break; 264 } 265 } 266 267 /** 268 * Sets head of queue, and checks if successor may be waiting 269 * in shared mode, if so propagating if either propagate > 0 or 270 * PROPAGATE status was set. 271 * 272 * @param node the node 273 * @param propagate the return value from a tryAcquireShared 274 */ 275 private void setHeadAndPropagate(Node node, long propagate) { 276 Node h = head; // Record old head for check below 277 setHead(node); 278 /* 279 * Try to signal next queued node if: 280 * Propagation was indicated by caller, 281 * or was recorded (as h.waitStatus either before 282 * or after setHead) by a previous operation 283 * (note: this uses sign-check of waitStatus because 284 * PROPAGATE status may transition to SIGNAL.) 285 * and 286 * The next node is waiting in shared mode, 287 * or we don't know, because it appears null 288 * 289 * The conservatism in both of these checks may cause 290 * unnecessary wake-ups, but only when there are multiple 291 * racing acquires/releases, so most need signals now or soon 292 * anyway. 293 */ 294 if (propagate > 0 || h == null || h.waitStatus < 0 || 295 (h = head) == null || h.waitStatus < 0) { 296 Node s = node.next; 297 if (s == null || s.isShared()) 298 doReleaseShared(); 299 } 300 } 301 302 // Utilities for various versions of acquire 303 304 /** 305 * Cancels an ongoing attempt to acquire. 306 * 307 * @param node the node 308 */ 309 private void cancelAcquire(Node node) { 310 // Ignore if node doesn't exist 311 if (node == null) 312 return; 313 314 node.thread = null; 315 316 // Skip cancelled predecessors 317 Node pred = node.prev; 318 while (pred.waitStatus > 0) 319 node.prev = pred = pred.prev; 320 321 // predNext is the apparent node to unsplice. CASes below will 322 // fail if not, in which case, we lost race vs another cancel 323 // or signal, so no further action is necessary. 324 Node predNext = pred.next; 325 326 // Can use unconditional write instead of CAS here. 327 // After this atomic step, other Nodes can skip past us. 328 // Before, we are free of interference from other threads. 329 node.waitStatus = Node.CANCELLED; 330 331 // If we are the tail, remove ourselves. 332 if (node == tail && compareAndSetTail(node, pred)) { 333 pred.compareAndSetNext(predNext, null); 334 } else { 335 // If successor needs signal, try to set pred's next-link 336 // so it will get one. Otherwise wake it up to propagate. 337 int ws; 338 if (pred != head && 339 ((ws = pred.waitStatus) == Node.SIGNAL || 340 (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && 341 pred.thread != null) { 342 Node next = node.next; 343 if (next != null && next.waitStatus <= 0) 344 pred.compareAndSetNext(predNext, next); 345 } else { 346 unparkSuccessor(node); 347 } 348 349 node.next = node; // help GC 350 } 351 } 352 353 /** 354 * Checks and updates status for a node that failed to acquire. 355 * Returns true if thread should block. This is the main signal 356 * control in all acquire loops. Requires that pred == node.prev. 357 * 358 * @param pred node's predecessor holding status 359 * @param node the node 360 * @return {@code true} if thread should block 361 */ 362 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 363 int ws = pred.waitStatus; 364 if (ws == Node.SIGNAL) 365 /* 366 * This node has already set status asking a release 367 * to signal it, so it can safely park. 368 */ 369 return true; 370 if (ws > 0) { 371 /* 372 * Predecessor was cancelled. Skip over predecessors and 373 * indicate retry. 374 */ 375 do { 376 node.prev = pred = pred.prev; 377 } while (pred.waitStatus > 0); 378 pred.next = node; 379 } else { 380 /* 381 * waitStatus must be 0 or PROPAGATE. Indicate that we 382 * need a signal, but don't park yet. Caller will need to 383 * retry to make sure it cannot acquire before parking. 384 */ 385 pred.compareAndSetWaitStatus(ws, Node.SIGNAL); 386 } 387 return false; 388 } 389 390 /** 391 * Convenience method to interrupt current thread. 392 */ 393 static void selfInterrupt() { 394 Thread.currentThread().interrupt(); 395 } 396 397 /** 398 * Convenience method to park and then check if interrupted. 399 * 400 * @return {@code true} if interrupted 401 */ 402 private final boolean parkAndCheckInterrupt() { 403 LockSupport.park(this); 404 return Thread.interrupted(); 405 } 406 407 /* 408 * Various flavors of acquire, varying in exclusive/shared and 409 * control modes. Each is mostly the same, but annoyingly 410 * different. Only a little bit of factoring is possible due to 411 * interactions of exception mechanics (including ensuring that we 412 * cancel if tryAcquire throws exception) and other control, at 413 * least not without hurting performance too much. 414 */ 415 416 /** 417 * Acquires in exclusive uninterruptible mode for thread already in 418 * queue. Used by condition wait methods as well as acquire. 419 * 420 * @param node the node 421 * @param arg the acquire argument 422 * @return {@code true} if interrupted while waiting 423 */ 424 final boolean acquireQueued(final Node node, long arg) { 425 try { 426 boolean interrupted = false; 427 for (;;) { 428 final Node p = node.predecessor(); 429 if (p == head && tryAcquire(arg)) { 430 setHead(node); 431 p.next = null; // help GC 432 return interrupted; 433 } 434 if (shouldParkAfterFailedAcquire(p, node) && 435 parkAndCheckInterrupt()) 436 interrupted = true; 437 } 438 } catch (Throwable t) { 439 cancelAcquire(node); 440 throw t; 441 } 442 } 443 444 /** 445 * Acquires in exclusive interruptible mode. 446 * @param arg the acquire argument 447 */ 448 private void doAcquireInterruptibly(long arg) 449 throws InterruptedException { 450 final Node node = addWaiter(Node.EXCLUSIVE); 451 try { 452 for (;;) { 453 final Node p = node.predecessor(); 454 if (p == head && tryAcquire(arg)) { 455 setHead(node); 456 p.next = null; // help GC 457 return; 458 } 459 if (shouldParkAfterFailedAcquire(p, node) && 460 parkAndCheckInterrupt()) 461 throw new InterruptedException(); 462 } 463 } catch (Throwable t) { 464 cancelAcquire(node); 465 throw t; 466 } 467 } 468 469 /** 470 * Acquires in exclusive timed mode. 471 * 472 * @param arg the acquire argument 473 * @param nanosTimeout max wait time 474 * @return {@code true} if acquired 475 */ 476 private boolean doAcquireNanos(long arg, long nanosTimeout) 477 throws InterruptedException { 478 if (nanosTimeout <= 0L) 479 return false; 480 final long deadline = System.nanoTime() + nanosTimeout; 481 final Node node = addWaiter(Node.EXCLUSIVE); 482 try { 483 for (;;) { 484 final Node p = node.predecessor(); 485 if (p == head && tryAcquire(arg)) { 486 setHead(node); 487 p.next = null; // help GC 488 return true; 489 } 490 nanosTimeout = deadline - System.nanoTime(); 491 if (nanosTimeout <= 0L) { 492 cancelAcquire(node); 493 return false; 494 } 495 if (shouldParkAfterFailedAcquire(p, node) && 496 nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 497 LockSupport.parkNanos(this, nanosTimeout); 498 if (Thread.interrupted()) 499 throw new InterruptedException(); 500 } 501 } catch (Throwable t) { 502 cancelAcquire(node); 503 throw t; 504 } 505 } 506 507 /** 508 * Acquires in shared uninterruptible mode. 509 * @param arg the acquire argument 510 */ 511 private void doAcquireShared(long arg) { 512 final Node node = addWaiter(Node.SHARED); 513 try { 514 boolean interrupted = false; 515 for (;;) { 516 final Node p = node.predecessor(); 517 if (p == head) { 518 long r = tryAcquireShared(arg); 519 if (r >= 0) { 520 setHeadAndPropagate(node, r); 521 p.next = null; // help GC 522 if (interrupted) 523 selfInterrupt(); 524 return; 525 } 526 } 527 if (shouldParkAfterFailedAcquire(p, node) && 528 parkAndCheckInterrupt()) 529 interrupted = true; 530 } 531 } catch (Throwable t) { 532 cancelAcquire(node); 533 throw t; 534 } 535 } 536 537 /** 538 * Acquires in shared interruptible mode. 539 * @param arg the acquire argument 540 */ 541 private void doAcquireSharedInterruptibly(long arg) 542 throws InterruptedException { 543 final Node node = addWaiter(Node.SHARED); 544 try { 545 for (;;) { 546 final Node p = node.predecessor(); 547 if (p == head) { 548 long r = tryAcquireShared(arg); 549 if (r >= 0) { 550 setHeadAndPropagate(node, r); 551 p.next = null; // help GC 552 return; 553 } 554 } 555 if (shouldParkAfterFailedAcquire(p, node) && 556 parkAndCheckInterrupt()) 557 throw new InterruptedException(); 558 } 559 } catch (Throwable t) { 560 cancelAcquire(node); 561 throw t; 562 } 563 } 564 565 /** 566 * Acquires in shared timed mode. 567 * 568 * @param arg the acquire argument 569 * @param nanosTimeout max wait time 570 * @return {@code true} if acquired 571 */ 572 private boolean doAcquireSharedNanos(long arg, long nanosTimeout) 573 throws InterruptedException { 574 if (nanosTimeout <= 0L) 575 return false; 576 final long deadline = System.nanoTime() + nanosTimeout; 577 final Node node = addWaiter(Node.SHARED); 578 try { 579 for (;;) { 580 final Node p = node.predecessor(); 581 if (p == head) { 582 long r = tryAcquireShared(arg); 583 if (r >= 0) { 584 setHeadAndPropagate(node, r); 585 p.next = null; // help GC 586 return true; 587 } 588 } 589 nanosTimeout = deadline - System.nanoTime(); 590 if (nanosTimeout <= 0L) { 591 cancelAcquire(node); 592 return false; 593 } 594 if (shouldParkAfterFailedAcquire(p, node) && 595 nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 596 LockSupport.parkNanos(this, nanosTimeout); 597 if (Thread.interrupted()) 598 throw new InterruptedException(); 599 } 600 } catch (Throwable t) { 601 cancelAcquire(node); 602 throw t; 603 } 604 } 605 606 // Main exported methods 607 608 /** 609 * Attempts to acquire in exclusive mode. This method should query 610 * if the state of the object permits it to be acquired in the 611 * exclusive mode, and if so to acquire it. 612 * 613 * <p>This method is always invoked by the thread performing 614 * acquire. If this method reports failure, the acquire method 615 * may queue the thread, if it is not already queued, until it is 616 * signalled by a release from some other thread. This can be used 617 * to implement method {@link Lock#tryLock()}. 618 * 619 * <p>The default 620 * implementation throws {@link UnsupportedOperationException}. 621 * 622 * @param arg the acquire argument. This value is always the one 623 * passed to an acquire method, or is the value saved on entry 624 * to a condition wait. The value is otherwise uninterpreted 625 * and can represent anything you like. 626 * @return {@code true} if successful. Upon success, this object has 627 * been acquired. 628 * @throws IllegalMonitorStateException if acquiring would place this 629 * synchronizer in an illegal state. This exception must be 630 * thrown in a consistent fashion for synchronization to work 631 * correctly. 632 * @throws UnsupportedOperationException if exclusive mode is not supported 633 */ 634 protected boolean tryAcquire(long arg) { 635 throw new UnsupportedOperationException(); 636 } 637 638 /** 639 * Attempts to set the state to reflect a release in exclusive 640 * mode. 641 * 642 * <p>This method is always invoked by the thread performing release. 643 * 644 * <p>The default implementation throws 645 * {@link UnsupportedOperationException}. 646 * 647 * @param arg the release argument. This value is always the one 648 * passed to a release method, or the current state value upon 649 * entry to a condition wait. The value is otherwise 650 * uninterpreted and can represent anything you like. 651 * @return {@code true} if this object is now in a fully released 652 * state, so that any waiting threads may attempt to acquire; 653 * and {@code false} otherwise. 654 * @throws IllegalMonitorStateException if releasing would place this 655 * synchronizer in an illegal state. This exception must be 656 * thrown in a consistent fashion for synchronization to work 657 * correctly. 658 * @throws UnsupportedOperationException if exclusive mode is not supported 659 */ 660 protected boolean tryRelease(long arg) { 661 throw new UnsupportedOperationException(); 662 } 663 664 /** 665 * Attempts to acquire in shared mode. This method should query if 666 * the state of the object permits it to be acquired in the shared 667 * mode, and if so to acquire it. 668 * 669 * <p>This method is always invoked by the thread performing 670 * acquire. If this method reports failure, the acquire method 671 * may queue the thread, if it is not already queued, until it is 672 * signalled by a release from some other thread. 673 * 674 * <p>The default implementation throws {@link 675 * UnsupportedOperationException}. 676 * 677 * @param arg the acquire argument. This value is always the one 678 * passed to an acquire method, or is the value saved on entry 679 * to a condition wait. The value is otherwise uninterpreted 680 * and can represent anything you like. 681 * @return a negative value on failure; zero if acquisition in shared 682 * mode succeeded but no subsequent shared-mode acquire can 683 * succeed; and a positive value if acquisition in shared 684 * mode succeeded and subsequent shared-mode acquires might 685 * also succeed, in which case a subsequent waiting thread 686 * must check availability. (Support for three different 687 * return values enables this method to be used in contexts 688 * where acquires only sometimes act exclusively.) Upon 689 * success, this object has been acquired. 690 * @throws IllegalMonitorStateException if acquiring would place this 691 * synchronizer in an illegal state. This exception must be 692 * thrown in a consistent fashion for synchronization to work 693 * correctly. 694 * @throws UnsupportedOperationException if shared mode is not supported 695 */ 696 protected long tryAcquireShared(long arg) { 697 throw new UnsupportedOperationException(); 698 } 699 700 /** 701 * Attempts to set the state to reflect a release in shared mode. 702 * 703 * <p>This method is always invoked by the thread performing release. 704 * 705 * <p>The default implementation throws 706 * {@link UnsupportedOperationException}. 707 * 708 * @param arg the release argument. This value is always the one 709 * passed to a release method, or the current state value upon 710 * entry to a condition wait. The value is otherwise 711 * uninterpreted and can represent anything you like. 712 * @return {@code true} if this release of shared mode may permit a 713 * waiting acquire (shared or exclusive) to succeed; and 714 * {@code false} otherwise 715 * @throws IllegalMonitorStateException if releasing would place this 716 * synchronizer in an illegal state. This exception must be 717 * thrown in a consistent fashion for synchronization to work 718 * correctly. 719 * @throws UnsupportedOperationException if shared mode is not supported 720 */ 721 protected boolean tryReleaseShared(long arg) { 722 throw new UnsupportedOperationException(); 723 } 724 725 /** 726 * Returns {@code true} if synchronization is held exclusively with 727 * respect to the current (calling) thread. This method is invoked 728 * upon each call to a non-waiting {@link ConditionObject} method. 729 * (Waiting methods instead invoke {@link #release}.) 730 * 731 * <p>The default implementation throws {@link 732 * UnsupportedOperationException}. This method is invoked 733 * internally only within {@link ConditionObject} methods, so need 734 * not be defined if conditions are not used. 735 * 736 * @return {@code true} if synchronization is held exclusively; 737 * {@code false} otherwise 738 * @throws UnsupportedOperationException if conditions are not supported 739 */ 740 protected boolean isHeldExclusively() { 741 throw new UnsupportedOperationException(); 742 } 743 744 /** 745 * Acquires in exclusive mode, ignoring interrupts. Implemented 746 * by invoking at least once {@link #tryAcquire}, 747 * returning on success. Otherwise the thread is queued, possibly 748 * repeatedly blocking and unblocking, invoking {@link 749 * #tryAcquire} until success. This method can be used 750 * to implement method {@link Lock#lock}. 751 * 752 * @param arg the acquire argument. This value is conveyed to 753 * {@link #tryAcquire} but is otherwise uninterpreted and 754 * can represent anything you like. 755 */ 756 public final void acquire(long arg) { 757 if (!tryAcquire(arg) && 758 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 759 selfInterrupt(); 760 } 761 762 /** 763 * Acquires in exclusive mode, aborting if interrupted. 764 * Implemented by first checking interrupt status, then invoking 765 * at least once {@link #tryAcquire}, returning on 766 * success. Otherwise the thread is queued, possibly repeatedly 767 * blocking and unblocking, invoking {@link #tryAcquire} 768 * until success or the thread is interrupted. This method can be 769 * used to implement method {@link Lock#lockInterruptibly}. 770 * 771 * @param arg the acquire argument. This value is conveyed to 772 * {@link #tryAcquire} but is otherwise uninterpreted and 773 * can represent anything you like. 774 * @throws InterruptedException if the current thread is interrupted 775 */ 776 public final void acquireInterruptibly(long arg) 777 throws InterruptedException { 778 if (Thread.interrupted()) 779 throw new InterruptedException(); 780 if (!tryAcquire(arg)) 781 doAcquireInterruptibly(arg); 782 } 783 784 /** 785 * Attempts to acquire in exclusive mode, aborting if interrupted, 786 * and failing if the given timeout elapses. Implemented by first 787 * checking interrupt status, then invoking at least once {@link 788 * #tryAcquire}, returning on success. Otherwise, the thread is 789 * queued, possibly repeatedly blocking and unblocking, invoking 790 * {@link #tryAcquire} until success or the thread is interrupted 791 * or the timeout elapses. This method can be used to implement 792 * method {@link Lock#tryLock(long, TimeUnit)}. 793 * 794 * @param arg the acquire argument. This value is conveyed to 795 * {@link #tryAcquire} but is otherwise uninterpreted and 796 * can represent anything you like. 797 * @param nanosTimeout the maximum number of nanoseconds to wait 798 * @return {@code true} if acquired; {@code false} if timed out 799 * @throws InterruptedException if the current thread is interrupted 800 */ 801 public final boolean tryAcquireNanos(long arg, long nanosTimeout) 802 throws InterruptedException { 803 if (Thread.interrupted()) 804 throw new InterruptedException(); 805 return tryAcquire(arg) || 806 doAcquireNanos(arg, nanosTimeout); 807 } 808 809 /** 810 * Releases in exclusive mode. Implemented by unblocking one or 811 * more threads if {@link #tryRelease} returns true. 812 * This method can be used to implement method {@link Lock#unlock}. 813 * 814 * @param arg the release argument. This value is conveyed to 815 * {@link #tryRelease} but is otherwise uninterpreted and 816 * can represent anything you like. 817 * @return the value returned from {@link #tryRelease} 818 */ 819 public final boolean release(long arg) { 820 if (tryRelease(arg)) { 821 Node h = head; 822 if (h != null && h.waitStatus != 0) 823 unparkSuccessor(h); 824 return true; 825 } 826 return false; 827 } 828 829 /** 830 * Acquires in shared mode, ignoring interrupts. Implemented by 831 * first invoking at least once {@link #tryAcquireShared}, 832 * returning on success. Otherwise the thread is queued, possibly 833 * repeatedly blocking and unblocking, invoking {@link 834 * #tryAcquireShared} until success. 835 * 836 * @param arg the acquire argument. This value is conveyed to 837 * {@link #tryAcquireShared} but is otherwise uninterpreted 838 * and can represent anything you like. 839 */ 840 public final void acquireShared(long arg) { 841 if (tryAcquireShared(arg) < 0) 842 doAcquireShared(arg); 843 } 844 845 /** 846 * Acquires in shared mode, aborting if interrupted. Implemented 847 * by first checking interrupt status, then invoking at least once 848 * {@link #tryAcquireShared}, returning on success. Otherwise the 849 * thread is queued, possibly repeatedly blocking and unblocking, 850 * invoking {@link #tryAcquireShared} until success or the thread 851 * is interrupted. 852 * @param arg the acquire argument. 853 * This value is conveyed to {@link #tryAcquireShared} but is 854 * otherwise uninterpreted and can represent anything 855 * you like. 856 * @throws InterruptedException if the current thread is interrupted 857 */ 858 public final void acquireSharedInterruptibly(long arg) 859 throws InterruptedException { 860 if (Thread.interrupted()) 861 throw new InterruptedException(); 862 if (tryAcquireShared(arg) < 0) 863 doAcquireSharedInterruptibly(arg); 864 } 865 866 /** 867 * Attempts to acquire in shared mode, aborting if interrupted, and 868 * failing if the given timeout elapses. Implemented by first 869 * checking interrupt status, then invoking at least once {@link 870 * #tryAcquireShared}, returning on success. Otherwise, the 871 * thread is queued, possibly repeatedly blocking and unblocking, 872 * invoking {@link #tryAcquireShared} until success or the thread 873 * is interrupted or the timeout elapses. 874 * 875 * @param arg the acquire argument. This value is conveyed to 876 * {@link #tryAcquireShared} but is otherwise uninterpreted 877 * and can represent anything you like. 878 * @param nanosTimeout the maximum number of nanoseconds to wait 879 * @return {@code true} if acquired; {@code false} if timed out 880 * @throws InterruptedException if the current thread is interrupted 881 */ 882 public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout) 883 throws InterruptedException { 884 if (Thread.interrupted()) 885 throw new InterruptedException(); 886 return tryAcquireShared(arg) >= 0 || 887 doAcquireSharedNanos(arg, nanosTimeout); 888 } 889 890 /** 891 * Releases in shared mode. Implemented by unblocking one or more 892 * threads if {@link #tryReleaseShared} returns true. 893 * 894 * @param arg the release argument. This value is conveyed to 895 * {@link #tryReleaseShared} but is otherwise uninterpreted 896 * and can represent anything you like. 897 * @return the value returned from {@link #tryReleaseShared} 898 */ 899 public final boolean releaseShared(long arg) { 900 if (tryReleaseShared(arg)) { 901 doReleaseShared(); 902 return true; 903 } 904 return false; 905 } 906 907 // Queue inspection methods 908 909 /** 910 * Queries whether any threads are waiting to acquire. Note that 911 * because cancellations due to interrupts and timeouts may occur 912 * at any time, a {@code true} return does not guarantee that any 913 * other thread will ever acquire. 914 * 915 * <p>In this implementation, this operation returns in 916 * constant time. 917 * 918 * @return {@code true} if there may be other threads waiting to acquire 919 */ 920 public final boolean hasQueuedThreads() { 921 return head != tail; 922 } 923 924 /** 925 * Queries whether any threads have ever contended to acquire this 926 * synchronizer; that is, if an acquire method has ever blocked. 927 * 928 * <p>In this implementation, this operation returns in 929 * constant time. 930 * 931 * @return {@code true} if there has ever been contention 932 */ 933 public final boolean hasContended() { 934 return head != null; 935 } 936 937 /** 938 * Returns the first (longest-waiting) thread in the queue, or 939 * {@code null} if no threads are currently queued. 940 * 941 * <p>In this implementation, this operation normally returns in 942 * constant time, but may iterate upon contention if other threads are 943 * concurrently modifying the queue. 944 * 945 * @return the first (longest-waiting) thread in the queue, or 946 * {@code null} if no threads are currently queued 947 */ 948 public final Thread getFirstQueuedThread() { 949 // handle only fast path, else relay 950 return (head == tail) ? null : fullGetFirstQueuedThread(); 951 } 952 953 /** 954 * Version of getFirstQueuedThread called when fastpath fails. 955 */ 956 private Thread fullGetFirstQueuedThread() { 957 /* 958 * The first node is normally head.next. Try to get its 959 * thread field, ensuring consistent reads: If thread 960 * field is nulled out or s.prev is no longer head, then 961 * some other thread(s) concurrently performed setHead in 962 * between some of our reads. We try this twice before 963 * resorting to traversal. 964 */ 965 Node h, s; 966 Thread st; 967 if (((h = head) != null && (s = h.next) != null && 968 s.prev == head && (st = s.thread) != null) || 969 ((h = head) != null && (s = h.next) != null && 970 s.prev == head && (st = s.thread) != null)) 971 return st; 972 973 /* 974 * Head's next field might not have been set yet, or may have 975 * been unset after setHead. So we must check to see if tail 976 * is actually first node. If not, we continue on, safely 977 * traversing from tail back to head to find first, 978 * guaranteeing termination. 979 */ 980 981 Thread firstThread = null; 982 for (Node p = tail; p != null && p != head; p = p.prev) { 983 Thread t = p.thread; 984 if (t != null) 985 firstThread = t; 986 } 987 return firstThread; 988 } 989 990 /** 991 * Returns true if the given thread is currently queued. 992 * 993 * <p>This implementation traverses the queue to determine 994 * presence of the given thread. 995 * 996 * @param thread the thread 997 * @return {@code true} if the given thread is on the queue 998 * @throws NullPointerException if the thread is null 999 */ 1000 public final boolean isQueued(Thread thread) { 1001 if (thread == null) 1002 throw new NullPointerException(); 1003 for (Node p = tail; p != null; p = p.prev) 1004 if (p.thread == thread) 1005 return true; 1006 return false; 1007 } 1008 1009 /** 1010 * Returns {@code true} if the apparent first queued thread, if one 1011 * exists, is waiting in exclusive mode. If this method returns 1012 * {@code true}, and the current thread is attempting to acquire in 1013 * shared mode (that is, this method is invoked from {@link 1014 * #tryAcquireShared}) then it is guaranteed that the current thread 1015 * is not the first queued thread. Used only as a heuristic in 1016 * ReentrantReadWriteLock. 1017 */ 1018 final boolean apparentlyFirstQueuedIsExclusive() { 1019 Node h, s; 1020 return (h = head) != null && 1021 (s = h.next) != null && 1022 !s.isShared() && 1023 s.thread != null; 1024 } 1025 1026 /** 1027 * Queries whether any threads have been waiting to acquire longer 1028 * than the current thread. 1029 * 1030 * <p>An invocation of this method is equivalent to (but may be 1031 * more efficient than): 1032 * <pre> {@code 1033 * getFirstQueuedThread() != Thread.currentThread() 1034 * && hasQueuedThreads()}</pre> 1035 * 1036 * <p>Note that because cancellations due to interrupts and 1037 * timeouts may occur at any time, a {@code true} return does not 1038 * guarantee that some other thread will acquire before the current 1039 * thread. Likewise, it is possible for another thread to win a 1040 * race to enqueue after this method has returned {@code false}, 1041 * due to the queue being empty. 1042 * 1043 * <p>This method is designed to be used by a fair synchronizer to 1044 * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>. 1045 * Such a synchronizer's {@link #tryAcquire} method should return 1046 * {@code false}, and its {@link #tryAcquireShared} method should 1047 * return a negative value, if this method returns {@code true} 1048 * (unless this is a reentrant acquire). For example, the {@code 1049 * tryAcquire} method for a fair, reentrant, exclusive mode 1050 * synchronizer might look like this: 1051 * 1052 * <pre> {@code 1053 * protected boolean tryAcquire(int arg) { 1054 * if (isHeldExclusively()) { 1055 * // A reentrant acquire; increment hold count 1056 * return true; 1057 * } else if (hasQueuedPredecessors()) { 1058 * return false; 1059 * } else { 1060 * // try to acquire normally 1061 * } 1062 * }}</pre> 1063 * 1064 * @return {@code true} if there is a queued thread preceding the 1065 * current thread, and {@code false} if the current thread 1066 * is at the head of the queue or the queue is empty 1067 * @since 1.7 1068 */ 1069 public final boolean hasQueuedPredecessors() { 1070 // The correctness of this depends on head being initialized 1071 // before tail and on head.next being accurate if the current 1072 // thread is first in queue. 1073 Node t = tail; // Read fields in reverse initialization order 1074 Node h = head; 1075 Node s; 1076 return h != t && 1077 ((s = h.next) == null || s.thread != Thread.currentThread()); 1078 } 1079 1080 1081 // Instrumentation and monitoring methods 1082 1083 /** 1084 * Returns an estimate of the number of threads waiting to 1085 * acquire. The value is only an estimate because the number of 1086 * threads may change dynamically while this method traverses 1087 * internal data structures. This method is designed for use in 1088 * monitoring system state, not for synchronization control. 1089 * 1090 * @return the estimated number of threads waiting to acquire 1091 */ 1092 public final int getQueueLength() { 1093 int n = 0; 1094 for (Node p = tail; p != null; p = p.prev) { 1095 if (p.thread != null) 1096 ++n; 1097 } 1098 return n; 1099 } 1100 1101 /** 1102 * Returns a collection containing threads that may be waiting to 1103 * acquire. Because the actual set of threads may change 1104 * dynamically while constructing this result, the returned 1105 * collection is only a best-effort estimate. The elements of the 1106 * returned collection are in no particular order. This method is 1107 * designed to facilitate construction of subclasses that provide 1108 * more extensive monitoring facilities. 1109 * 1110 * @return the collection of threads 1111 */ 1112 public final Collection<Thread> getQueuedThreads() { 1113 ArrayList<Thread> list = new ArrayList<>(); 1114 for (Node p = tail; p != null; p = p.prev) { 1115 Thread t = p.thread; 1116 if (t != null) 1117 list.add(t); 1118 } 1119 return list; 1120 } 1121 1122 /** 1123 * Returns a collection containing threads that may be waiting to 1124 * acquire in exclusive mode. This has the same properties 1125 * as {@link #getQueuedThreads} except that it only returns 1126 * those threads waiting due to an exclusive acquire. 1127 * 1128 * @return the collection of threads 1129 */ 1130 public final Collection<Thread> getExclusiveQueuedThreads() { 1131 ArrayList<Thread> list = new ArrayList<>(); 1132 for (Node p = tail; p != null; p = p.prev) { 1133 if (!p.isShared()) { 1134 Thread t = p.thread; 1135 if (t != null) 1136 list.add(t); 1137 } 1138 } 1139 return list; 1140 } 1141 1142 /** 1143 * Returns a collection containing threads that may be waiting to 1144 * acquire in shared mode. This has the same properties 1145 * as {@link #getQueuedThreads} except that it only returns 1146 * those threads waiting due to a shared acquire. 1147 * 1148 * @return the collection of threads 1149 */ 1150 public final Collection<Thread> getSharedQueuedThreads() { 1151 ArrayList<Thread> list = new ArrayList<>(); 1152 for (Node p = tail; p != null; p = p.prev) { 1153 if (p.isShared()) { 1154 Thread t = p.thread; 1155 if (t != null) 1156 list.add(t); 1157 } 1158 } 1159 return list; 1160 } 1161 1162 /** 1163 * Returns a string identifying this synchronizer, as well as its state. 1164 * The state, in brackets, includes the String {@code "State ="} 1165 * followed by the current value of {@link #getState}, and either 1166 * {@code "nonempty"} or {@code "empty"} depending on whether the 1167 * queue is empty. 1168 * 1169 * @return a string identifying this synchronizer, as well as its state 1170 */ 1171 public String toString() { 1172 return super.toString() 1173 + "[State = " + getState() + ", " 1174 + (hasQueuedThreads() ? "non" : "") + "empty queue]"; 1175 } 1176 1177 1178 // Internal support methods for Conditions 1179 1180 /** 1181 * Returns true if a node, always one that was initially placed on 1182 * a condition queue, is now waiting to reacquire on sync queue. 1183 * @param node the node 1184 * @return true if is reacquiring 1185 */ 1186 final boolean isOnSyncQueue(Node node) { 1187 if (node.waitStatus == Node.CONDITION || node.prev == null) 1188 return false; 1189 if (node.next != null) // If has successor, it must be on queue 1190 return true; 1191 /* 1192 * node.prev can be non-null, but not yet on queue because 1193 * the CAS to place it on queue can fail. So we have to 1194 * traverse from tail to make sure it actually made it. It 1195 * will always be near the tail in calls to this method, and 1196 * unless the CAS failed (which is unlikely), it will be 1197 * there, so we hardly ever traverse much. 1198 */ 1199 return findNodeFromTail(node); 1200 } 1201 1202 /** 1203 * Returns true if node is on sync queue by searching backwards from tail. 1204 * Called only when needed by isOnSyncQueue. 1205 * @return true if present 1206 */ 1207 private boolean findNodeFromTail(Node node) { 1208 // We check for node first, since it's likely to be at or near tail. 1209 // tail is known to be non-null, so we could re-order to "save" 1210 // one null check, but we leave it this way to help the VM. 1211 for (Node p = tail;;) { 1212 if (p == node) 1213 return true; 1214 if (p == null) 1215 return false; 1216 p = p.prev; 1217 } 1218 } 1219 1220 /** 1221 * Transfers a node from a condition queue onto sync queue. 1222 * Returns true if successful. 1223 * @param node the node 1224 * @return true if successfully transferred (else the node was 1225 * cancelled before signal) 1226 */ 1227 final boolean transferForSignal(Node node) { 1228 /* 1229 * If cannot change waitStatus, the node has been cancelled. 1230 */ 1231 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) 1232 return false; 1233 1234 /* 1235 * Splice onto queue and try to set waitStatus of predecessor to 1236 * indicate that thread is (probably) waiting. If cancelled or 1237 * attempt to set waitStatus fails, wake up to resync (in which 1238 * case the waitStatus can be transiently and harmlessly wrong). 1239 */ 1240 Node p = enq(node); 1241 int ws = p.waitStatus; 1242 if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) 1243 LockSupport.unpark(node.thread); 1244 return true; 1245 } 1246 1247 /** 1248 * Transfers node, if necessary, to sync queue after a cancelled wait. 1249 * Returns true if thread was cancelled before being signalled. 1250 * 1251 * @param node the node 1252 * @return true if cancelled before the node was signalled 1253 */ 1254 final boolean transferAfterCancelledWait(Node node) { 1255 if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) { 1256 enq(node); 1257 return true; 1258 } 1259 /* 1260 * If we lost out to a signal(), then we can't proceed 1261 * until it finishes its enq(). Cancelling during an 1262 * incomplete transfer is both rare and transient, so just 1263 * spin. 1264 */ 1265 while (!isOnSyncQueue(node)) 1266 Thread.yield(); 1267 return false; 1268 } 1269 1270 /** 1271 * Invokes release with current state value; returns saved state. 1272 * Cancels node and throws exception on failure. 1273 * @param node the condition node for this wait 1274 * @return previous sync state 1275 */ 1276 final long fullyRelease(Node node) { 1277 try { 1278 long savedState = getState(); 1279 if (release(savedState)) 1280 return savedState; 1281 throw new IllegalMonitorStateException(); 1282 } catch (Throwable t) { 1283 node.waitStatus = Node.CANCELLED; 1284 throw t; 1285 } 1286 } 1287 1288 // Instrumentation methods for conditions 1289 1290 /** 1291 * Queries whether the given ConditionObject 1292 * uses this synchronizer as its lock. 1293 * 1294 * @param condition the condition 1295 * @return {@code true} if owned 1296 * @throws NullPointerException if the condition is null 1297 */ 1298 public final boolean owns(ConditionObject condition) { 1299 return condition.isOwnedBy(this); 1300 } 1301 1302 /** 1303 * Queries whether any threads are waiting on the given condition 1304 * associated with this synchronizer. Note that because timeouts 1305 * and interrupts may occur at any time, a {@code true} return 1306 * does not guarantee that a future {@code signal} will awaken 1307 * any threads. This method is designed primarily for use in 1308 * monitoring of the system state. 1309 * 1310 * @param condition the condition 1311 * @return {@code true} if there are any waiting threads 1312 * @throws IllegalMonitorStateException if exclusive synchronization 1313 * is not held 1314 * @throws IllegalArgumentException if the given condition is 1315 * not associated with this synchronizer 1316 * @throws NullPointerException if the condition is null 1317 */ 1318 public final boolean hasWaiters(ConditionObject condition) { 1319 if (!owns(condition)) 1320 throw new IllegalArgumentException("Not owner"); 1321 return condition.hasWaiters(); 1322 } 1323 1324 /** 1325 * Returns an estimate of the number of threads waiting on the 1326 * given condition associated with this synchronizer. Note that 1327 * because timeouts and interrupts may occur at any time, the 1328 * estimate serves only as an upper bound on the actual number of 1329 * waiters. This method is designed for use in monitoring system 1330 * state, not for synchronization control. 1331 * 1332 * @param condition the condition 1333 * @return the estimated number of waiting threads 1334 * @throws IllegalMonitorStateException if exclusive synchronization 1335 * is not held 1336 * @throws IllegalArgumentException if the given condition is 1337 * not associated with this synchronizer 1338 * @throws NullPointerException if the condition is null 1339 */ 1340 public final int getWaitQueueLength(ConditionObject condition) { 1341 if (!owns(condition)) 1342 throw new IllegalArgumentException("Not owner"); 1343 return condition.getWaitQueueLength(); 1344 } 1345 1346 /** 1347 * Returns a collection containing those threads that may be 1348 * waiting on the given condition associated with this 1349 * synchronizer. Because the actual set of threads may change 1350 * dynamically while constructing this result, the returned 1351 * collection is only a best-effort estimate. The elements of the 1352 * returned collection are in no particular order. 1353 * 1354 * @param condition the condition 1355 * @return the collection of threads 1356 * @throws IllegalMonitorStateException if exclusive synchronization 1357 * is not held 1358 * @throws IllegalArgumentException if the given condition is 1359 * not associated with this synchronizer 1360 * @throws NullPointerException if the condition is null 1361 */ 1362 public final Collection<Thread> getWaitingThreads(ConditionObject condition) { 1363 if (!owns(condition)) 1364 throw new IllegalArgumentException("Not owner"); 1365 return condition.getWaitingThreads(); 1366 } 1367 1368 /** 1369 * Condition implementation for a {@link 1370 * AbstractQueuedLongSynchronizer} serving as the basis of a {@link 1371 * Lock} implementation. 1372 * 1373 * <p>Method documentation for this class describes mechanics, 1374 * not behavioral specifications from the point of view of Lock 1375 * and Condition users. Exported versions of this class will in 1376 * general need to be accompanied by documentation describing 1377 * condition semantics that rely on those of the associated 1378 * {@code AbstractQueuedLongSynchronizer}. 1379 * 1380 * <p>This class is Serializable, but all fields are transient, 1381 * so deserialized conditions have no waiters. 1382 * 1383 * @since 1.6 1384 */ 1385 public class ConditionObject implements Condition, java.io.Serializable { 1386 private static final long serialVersionUID = 1173984872572414699L; 1387 /** First node of condition queue. */ 1388 private transient Node firstWaiter; 1389 /** Last node of condition queue. */ 1390 private transient Node lastWaiter; 1391 1392 /** 1393 * Creates a new {@code ConditionObject} instance. 1394 */ 1395 public ConditionObject() { } 1396 1397 // Internal methods 1398 1399 /** 1400 * Adds a new waiter to wait queue. 1401 * @return its new wait node 1402 */ 1403 private Node addConditionWaiter() { 1404 Node t = lastWaiter; 1405 // If lastWaiter is cancelled, clean out. 1406 if (t != null && t.waitStatus != Node.CONDITION) { 1407 unlinkCancelledWaiters(); 1408 t = lastWaiter; 1409 } 1410 1411 Node node = new Node(Node.CONDITION); 1412 1413 if (t == null) 1414 firstWaiter = node; 1415 else 1416 t.nextWaiter = node; 1417 lastWaiter = node; 1418 return node; 1419 } 1420 1421 /** 1422 * Removes and transfers nodes until hit non-cancelled one or 1423 * null. Split out from signal in part to encourage compilers 1424 * to inline the case of no waiters. 1425 * @param first (non-null) the first node on condition queue 1426 */ 1427 private void doSignal(Node first) { 1428 do { 1429 if ( (firstWaiter = first.nextWaiter) == null) 1430 lastWaiter = null; 1431 first.nextWaiter = null; 1432 } while (!transferForSignal(first) && 1433 (first = firstWaiter) != null); 1434 } 1435 1436 /** 1437 * Removes and transfers all nodes. 1438 * @param first (non-null) the first node on condition queue 1439 */ 1440 private void doSignalAll(Node first) { 1441 lastWaiter = firstWaiter = null; 1442 do { 1443 Node next = first.nextWaiter; 1444 first.nextWaiter = null; 1445 transferForSignal(first); 1446 first = next; 1447 } while (first != null); 1448 } 1449 1450 /** 1451 * Unlinks cancelled waiter nodes from condition queue. 1452 * Called only while holding lock. This is called when 1453 * cancellation occurred during condition wait, and upon 1454 * insertion of a new waiter when lastWaiter is seen to have 1455 * been cancelled. This method is needed to avoid garbage 1456 * retention in the absence of signals. So even though it may 1457 * require a full traversal, it comes into play only when 1458 * timeouts or cancellations occur in the absence of 1459 * signals. It traverses all nodes rather than stopping at a 1460 * particular target to unlink all pointers to garbage nodes 1461 * without requiring many re-traversals during cancellation 1462 * storms. 1463 */ 1464 private void unlinkCancelledWaiters() { 1465 Node t = firstWaiter; 1466 Node trail = null; 1467 while (t != null) { 1468 Node next = t.nextWaiter; 1469 if (t.waitStatus != Node.CONDITION) { 1470 t.nextWaiter = null; 1471 if (trail == null) 1472 firstWaiter = next; 1473 else 1474 trail.nextWaiter = next; 1475 if (next == null) 1476 lastWaiter = trail; 1477 } 1478 else 1479 trail = t; 1480 t = next; 1481 } 1482 } 1483 1484 // public methods 1485 1486 /** 1487 * Moves the longest-waiting thread, if one exists, from the 1488 * wait queue for this condition to the wait queue for the 1489 * owning lock. 1490 * 1491 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1492 * returns {@code false} 1493 */ 1494 public final void signal() { 1495 if (!isHeldExclusively()) 1496 throw new IllegalMonitorStateException(); 1497 Node first = firstWaiter; 1498 if (first != null) 1499 doSignal(first); 1500 } 1501 1502 /** 1503 * Moves all threads from the wait queue for this condition to 1504 * the wait queue for the owning lock. 1505 * 1506 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1507 * returns {@code false} 1508 */ 1509 public final void signalAll() { 1510 if (!isHeldExclusively()) 1511 throw new IllegalMonitorStateException(); 1512 Node first = firstWaiter; 1513 if (first != null) 1514 doSignalAll(first); 1515 } 1516 1517 /** 1518 * Implements uninterruptible condition wait. 1519 * <ol> 1520 * <li>Save lock state returned by {@link #getState}. 1521 * <li>Invoke {@link #release} with saved state as argument, 1522 * throwing IllegalMonitorStateException if it fails. 1523 * <li>Block until signalled. 1524 * <li>Reacquire by invoking specialized version of 1525 * {@link #acquire} with saved state as argument. 1526 * </ol> 1527 */ 1528 public final void awaitUninterruptibly() { 1529 Node node = addConditionWaiter(); 1530 long savedState = fullyRelease(node); 1531 boolean interrupted = false; 1532 while (!isOnSyncQueue(node)) { 1533 LockSupport.park(this); 1534 if (Thread.interrupted()) 1535 interrupted = true; 1536 } 1537 if (acquireQueued(node, savedState) || interrupted) 1538 selfInterrupt(); 1539 } 1540 1541 /* 1542 * For interruptible waits, we need to track whether to throw 1543 * InterruptedException, if interrupted while blocked on 1544 * condition, versus reinterrupt current thread, if 1545 * interrupted while blocked waiting to re-acquire. 1546 */ 1547 1548 /** Mode meaning to reinterrupt on exit from wait */ 1549 private static final int REINTERRUPT = 1; 1550 /** Mode meaning to throw InterruptedException on exit from wait */ 1551 private static final int THROW_IE = -1; 1552 1553 /** 1554 * Checks for interrupt, returning THROW_IE if interrupted 1555 * before signalled, REINTERRUPT if after signalled, or 1556 * 0 if not interrupted. 1557 */ 1558 private int checkInterruptWhileWaiting(Node node) { 1559 return Thread.interrupted() ? 1560 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 1561 0; 1562 } 1563 1564 /** 1565 * Throws InterruptedException, reinterrupts current thread, or 1566 * does nothing, depending on mode. 1567 */ 1568 private void reportInterruptAfterWait(int interruptMode) 1569 throws InterruptedException { 1570 if (interruptMode == THROW_IE) 1571 throw new InterruptedException(); 1572 else if (interruptMode == REINTERRUPT) 1573 selfInterrupt(); 1574 } 1575 1576 /** 1577 * Implements interruptible condition wait. 1578 * <ol> 1579 * <li>If current thread is interrupted, throw InterruptedException. 1580 * <li>Save lock state returned by {@link #getState}. 1581 * <li>Invoke {@link #release} with saved state as argument, 1582 * throwing IllegalMonitorStateException if it fails. 1583 * <li>Block until signalled or interrupted. 1584 * <li>Reacquire by invoking specialized version of 1585 * {@link #acquire} with saved state as argument. 1586 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1587 * </ol> 1588 */ 1589 public final void await() throws InterruptedException { 1590 if (Thread.interrupted()) 1591 throw new InterruptedException(); 1592 Node node = addConditionWaiter(); 1593 long savedState = fullyRelease(node); 1594 int interruptMode = 0; 1595 while (!isOnSyncQueue(node)) { 1596 LockSupport.park(this); 1597 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1598 break; 1599 } 1600 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1601 interruptMode = REINTERRUPT; 1602 if (node.nextWaiter != null) // clean up if cancelled 1603 unlinkCancelledWaiters(); 1604 if (interruptMode != 0) 1605 reportInterruptAfterWait(interruptMode); 1606 } 1607 1608 /** 1609 * Implements timed condition wait. 1610 * <ol> 1611 * <li>If current thread is interrupted, throw InterruptedException. 1612 * <li>Save lock state returned by {@link #getState}. 1613 * <li>Invoke {@link #release} with saved state as argument, 1614 * throwing IllegalMonitorStateException if it fails. 1615 * <li>Block until signalled, interrupted, or timed out. 1616 * <li>Reacquire by invoking specialized version of 1617 * {@link #acquire} with saved state as argument. 1618 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1619 * </ol> 1620 */ 1621 public final long awaitNanos(long nanosTimeout) 1622 throws InterruptedException { 1623 if (Thread.interrupted()) 1624 throw new InterruptedException(); 1625 // We don't check for nanosTimeout <= 0L here, to allow 1626 // awaitNanos(0) as a way to "yield the lock". 1627 final long deadline = System.nanoTime() + nanosTimeout; 1628 long initialNanos = nanosTimeout; 1629 Node node = addConditionWaiter(); 1630 long savedState = fullyRelease(node); 1631 int interruptMode = 0; 1632 while (!isOnSyncQueue(node)) { 1633 if (nanosTimeout <= 0L) { 1634 transferAfterCancelledWait(node); 1635 break; 1636 } 1637 if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 1638 LockSupport.parkNanos(this, nanosTimeout); 1639 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1640 break; 1641 nanosTimeout = deadline - System.nanoTime(); 1642 } 1643 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1644 interruptMode = REINTERRUPT; 1645 if (node.nextWaiter != null) 1646 unlinkCancelledWaiters(); 1647 if (interruptMode != 0) 1648 reportInterruptAfterWait(interruptMode); 1649 long remaining = deadline - System.nanoTime(); // avoid overflow 1650 return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE; 1651 } 1652 1653 /** 1654 * Implements absolute timed condition wait. 1655 * <ol> 1656 * <li>If current thread is interrupted, throw InterruptedException. 1657 * <li>Save lock state returned by {@link #getState}. 1658 * <li>Invoke {@link #release} with saved state as argument, 1659 * throwing IllegalMonitorStateException if it fails. 1660 * <li>Block until signalled, interrupted, or timed out. 1661 * <li>Reacquire by invoking specialized version of 1662 * {@link #acquire} with saved state as argument. 1663 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1664 * <li>If timed out while blocked in step 4, return false, else true. 1665 * </ol> 1666 */ 1667 public final boolean awaitUntil(Date deadline) 1668 throws InterruptedException { 1669 long abstime = deadline.getTime(); 1670 if (Thread.interrupted()) 1671 throw new InterruptedException(); 1672 Node node = addConditionWaiter(); 1673 long savedState = fullyRelease(node); 1674 boolean timedout = false; 1675 int interruptMode = 0; 1676 while (!isOnSyncQueue(node)) { 1677 if (System.currentTimeMillis() >= abstime) { 1678 timedout = transferAfterCancelledWait(node); 1679 break; 1680 } 1681 LockSupport.parkUntil(this, abstime); 1682 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1683 break; 1684 } 1685 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1686 interruptMode = REINTERRUPT; 1687 if (node.nextWaiter != null) 1688 unlinkCancelledWaiters(); 1689 if (interruptMode != 0) 1690 reportInterruptAfterWait(interruptMode); 1691 return !timedout; 1692 } 1693 1694 /** 1695 * Implements timed condition wait. 1696 * <ol> 1697 * <li>If current thread is interrupted, throw InterruptedException. 1698 * <li>Save lock state returned by {@link #getState}. 1699 * <li>Invoke {@link #release} with saved state as argument, 1700 * throwing IllegalMonitorStateException if it fails. 1701 * <li>Block until signalled, interrupted, or timed out. 1702 * <li>Reacquire by invoking specialized version of 1703 * {@link #acquire} with saved state as argument. 1704 * <li>If interrupted while blocked in step 4, throw InterruptedException. 1705 * <li>If timed out while blocked in step 4, return false, else true. 1706 * </ol> 1707 */ 1708 public final boolean await(long time, TimeUnit unit) 1709 throws InterruptedException { 1710 long nanosTimeout = unit.toNanos(time); 1711 if (Thread.interrupted()) 1712 throw new InterruptedException(); 1713 // We don't check for nanosTimeout <= 0L here, to allow 1714 // await(0, unit) as a way to "yield the lock". 1715 final long deadline = System.nanoTime() + nanosTimeout; 1716 Node node = addConditionWaiter(); 1717 long savedState = fullyRelease(node); 1718 boolean timedout = false; 1719 int interruptMode = 0; 1720 while (!isOnSyncQueue(node)) { 1721 if (nanosTimeout <= 0L) { 1722 timedout = transferAfterCancelledWait(node); 1723 break; 1724 } 1725 if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) 1726 LockSupport.parkNanos(this, nanosTimeout); 1727 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 1728 break; 1729 nanosTimeout = deadline - System.nanoTime(); 1730 } 1731 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 1732 interruptMode = REINTERRUPT; 1733 if (node.nextWaiter != null) 1734 unlinkCancelledWaiters(); 1735 if (interruptMode != 0) 1736 reportInterruptAfterWait(interruptMode); 1737 return !timedout; 1738 } 1739 1740 // support for instrumentation 1741 1742 /** 1743 * Returns true if this condition was created by the given 1744 * synchronization object. 1745 * 1746 * @return {@code true} if owned 1747 */ 1748 final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) { 1749 return sync == AbstractQueuedLongSynchronizer.this; 1750 } 1751 1752 /** 1753 * Queries whether any threads are waiting on this condition. 1754 * Implements {@link AbstractQueuedLongSynchronizer#hasWaiters(ConditionObject)}. 1755 * 1756 * @return {@code true} if there are any waiting threads 1757 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1758 * returns {@code false} 1759 */ 1760 protected final boolean hasWaiters() { 1761 if (!isHeldExclusively()) 1762 throw new IllegalMonitorStateException(); 1763 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 1764 if (w.waitStatus == Node.CONDITION) 1765 return true; 1766 } 1767 return false; 1768 } 1769 1770 /** 1771 * Returns an estimate of the number of threads waiting on 1772 * this condition. 1773 * Implements {@link AbstractQueuedLongSynchronizer#getWaitQueueLength(ConditionObject)}. 1774 * 1775 * @return the estimated number of waiting threads 1776 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1777 * returns {@code false} 1778 */ 1779 protected final int getWaitQueueLength() { 1780 if (!isHeldExclusively()) 1781 throw new IllegalMonitorStateException(); 1782 int n = 0; 1783 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 1784 if (w.waitStatus == Node.CONDITION) 1785 ++n; 1786 } 1787 return n; 1788 } 1789 1790 /** 1791 * Returns a collection containing those threads that may be 1792 * waiting on this Condition. 1793 * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads(ConditionObject)}. 1794 * 1795 * @return the collection of threads 1796 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} 1797 * returns {@code false} 1798 */ 1799 protected final Collection<Thread> getWaitingThreads() { 1800 if (!isHeldExclusively()) 1801 throw new IllegalMonitorStateException(); 1802 ArrayList<Thread> list = new ArrayList<>(); 1803 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 1804 if (w.waitStatus == Node.CONDITION) { 1805 Thread t = w.thread; 1806 if (t != null) 1807 list.add(t); 1808 } 1809 } 1810 return list; 1811 } 1812 } 1813 1814 // VarHandle mechanics 1815 private static final VarHandle STATE; 1816 private static final VarHandle HEAD; 1817 private static final VarHandle TAIL; 1818 1819 static { 1820 try { 1821 MethodHandles.Lookup l = MethodHandles.lookup(); 1822 STATE = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "state", long.class); 1823 HEAD = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "head", Node.class); 1824 TAIL = l.findVarHandle(AbstractQueuedLongSynchronizer.class, "tail", Node.class); 1825 } catch (ReflectiveOperationException e) { 1826 throw new Error(e); 1827 } 1828 1829 // Reduce the risk of rare disastrous classloading in first call to 1830 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 1831 Class<?> ensureLoaded = LockSupport.class; 1832 } 1833 1834 /** 1835 * Initializes head and tail fields on first contention. 1836 */ 1837 private final void initializeSyncQueue() { 1838 Node h; 1839 if (HEAD.compareAndSet(this, null, (h = new Node()))) 1840 tail = h; 1841 } 1842 1843 /** 1844 * CASes tail field. 1845 */ 1846 private final boolean compareAndSetTail(Node expect, Node update) { 1847 return TAIL.compareAndSet(this, expect, update); 1848 } 1849} 1850