1/* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25/* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36package java.util.concurrent; 37 38import java.security.AccessControlContext; 39import java.security.AccessController; 40import java.security.PrivilegedAction; 41import java.util.ArrayList; 42import java.util.ConcurrentModificationException; 43import java.util.HashSet; 44import java.util.Iterator; 45import java.util.List; 46import java.util.concurrent.atomic.AtomicInteger; 47import java.util.concurrent.locks.AbstractQueuedSynchronizer; 48import java.util.concurrent.locks.Condition; 49import java.util.concurrent.locks.ReentrantLock; 50 51/** 52 * An {@link ExecutorService} that executes each submitted task using 53 * one of possibly several pooled threads, normally configured 54 * using {@link Executors} factory methods. 55 * 56 * <p>Thread pools address two different problems: they usually 57 * provide improved performance when executing large numbers of 58 * asynchronous tasks, due to reduced per-task invocation overhead, 59 * and they provide a means of bounding and managing the resources, 60 * including threads, consumed when executing a collection of tasks. 61 * Each {@code ThreadPoolExecutor} also maintains some basic 62 * statistics, such as the number of completed tasks. 63 * 64 * <p>To be useful across a wide range of contexts, this class 65 * provides many adjustable parameters and extensibility 66 * hooks. However, programmers are urged to use the more convenient 67 * {@link Executors} factory methods {@link 68 * Executors#newCachedThreadPool} (unbounded thread pool, with 69 * automatic thread reclamation), {@link Executors#newFixedThreadPool} 70 * (fixed size thread pool) and {@link 71 * Executors#newSingleThreadExecutor} (single background thread), that 72 * preconfigure settings for the most common usage 73 * scenarios. Otherwise, use the following guide when manually 74 * configuring and tuning this class: 75 * 76 * <dl> 77 * 78 * <dt>Core and maximum pool sizes</dt> 79 * 80 * <dd>A {@code ThreadPoolExecutor} will automatically adjust the 81 * pool size (see {@link #getPoolSize}) 82 * according to the bounds set by 83 * corePoolSize (see {@link #getCorePoolSize}) and 84 * maximumPoolSize (see {@link #getMaximumPoolSize}). 85 * 86 * When a new task is submitted in method {@link #execute(Runnable)}, 87 * if fewer than corePoolSize threads are running, a new thread is 88 * created to handle the request, even if other worker threads are 89 * idle. Else if fewer than maximumPoolSize threads are running, a 90 * new thread will be created to handle the request only if the queue 91 * is full. By setting corePoolSize and maximumPoolSize the same, you 92 * create a fixed-size thread pool. By setting maximumPoolSize to an 93 * essentially unbounded value such as {@code Integer.MAX_VALUE}, you 94 * allow the pool to accommodate an arbitrary number of concurrent 95 * tasks. Most typically, core and maximum pool sizes are set only 96 * upon construction, but they may also be changed dynamically using 97 * {@link #setCorePoolSize} and {@link #setMaximumPoolSize}. </dd> 98 * 99 * <dt>On-demand construction</dt> 100 * 101 * <dd>By default, even core threads are initially created and 102 * started only when new tasks arrive, but this can be overridden 103 * dynamically using method {@link #prestartCoreThread} or {@link 104 * #prestartAllCoreThreads}. You probably want to prestart threads if 105 * you construct the pool with a non-empty queue. </dd> 106 * 107 * <dt>Creating new threads</dt> 108 * 109 * <dd>New threads are created using a {@link ThreadFactory}. If not 110 * otherwise specified, a {@link Executors#defaultThreadFactory} is 111 * used, that creates threads to all be in the same {@link 112 * ThreadGroup} and with the same {@code NORM_PRIORITY} priority and 113 * non-daemon status. By supplying a different ThreadFactory, you can 114 * alter the thread's name, thread group, priority, daemon status, 115 * etc. If a {@code ThreadFactory} fails to create a thread when asked 116 * by returning null from {@code newThread}, the executor will 117 * continue, but might not be able to execute any tasks. Threads 118 * should possess the "modifyThread" {@code RuntimePermission}. If 119 * worker threads or other threads using the pool do not possess this 120 * permission, service may be degraded: configuration changes may not 121 * take effect in a timely manner, and a shutdown pool may remain in a 122 * state in which termination is possible but not completed.</dd> 123 * 124 * <dt>Keep-alive times</dt> 125 * 126 * <dd>If the pool currently has more than corePoolSize threads, 127 * excess threads will be terminated if they have been idle for more 128 * than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}). 129 * This provides a means of reducing resource consumption when the 130 * pool is not being actively used. If the pool becomes more active 131 * later, new threads will be constructed. This parameter can also be 132 * changed dynamically using method {@link #setKeepAliveTime(long, 133 * TimeUnit)}. Using a value of {@code Long.MAX_VALUE} {@link 134 * TimeUnit#NANOSECONDS} effectively disables idle threads from ever 135 * terminating prior to shut down. By default, the keep-alive policy 136 * applies only when there are more than corePoolSize threads, but 137 * method {@link #allowCoreThreadTimeOut(boolean)} can be used to 138 * apply this time-out policy to core threads as well, so long as the 139 * keepAliveTime value is non-zero. </dd> 140 * 141 * <dt>Queuing</dt> 142 * 143 * <dd>Any {@link BlockingQueue} may be used to transfer and hold 144 * submitted tasks. The use of this queue interacts with pool sizing: 145 * 146 * <ul> 147 * 148 * <li>If fewer than corePoolSize threads are running, the Executor 149 * always prefers adding a new thread 150 * rather than queuing. 151 * 152 * <li>If corePoolSize or more threads are running, the Executor 153 * always prefers queuing a request rather than adding a new 154 * thread. 155 * 156 * <li>If a request cannot be queued, a new thread is created unless 157 * this would exceed maximumPoolSize, in which case, the task will be 158 * rejected. 159 * 160 * </ul> 161 * 162 * There are three general strategies for queuing: 163 * <ol> 164 * 165 * <li><em> Direct handoffs.</em> A good default choice for a work 166 * queue is a {@link SynchronousQueue} that hands off tasks to threads 167 * without otherwise holding them. Here, an attempt to queue a task 168 * will fail if no threads are immediately available to run it, so a 169 * new thread will be constructed. This policy avoids lockups when 170 * handling sets of requests that might have internal dependencies. 171 * Direct handoffs generally require unbounded maximumPoolSizes to 172 * avoid rejection of new submitted tasks. This in turn admits the 173 * possibility of unbounded thread growth when commands continue to 174 * arrive on average faster than they can be processed. 175 * 176 * <li><em> Unbounded queues.</em> Using an unbounded queue (for 177 * example a {@link LinkedBlockingQueue} without a predefined 178 * capacity) will cause new tasks to wait in the queue when all 179 * corePoolSize threads are busy. Thus, no more than corePoolSize 180 * threads will ever be created. (And the value of the maximumPoolSize 181 * therefore doesn't have any effect.) This may be appropriate when 182 * each task is completely independent of others, so tasks cannot 183 * affect each others execution; for example, in a web page server. 184 * While this style of queuing can be useful in smoothing out 185 * transient bursts of requests, it admits the possibility of 186 * unbounded work queue growth when commands continue to arrive on 187 * average faster than they can be processed. 188 * 189 * <li><em>Bounded queues.</em> A bounded queue (for example, an 190 * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when 191 * used with finite maximumPoolSizes, but can be more difficult to 192 * tune and control. Queue sizes and maximum pool sizes may be traded 193 * off for each other: Using large queues and small pools minimizes 194 * CPU usage, OS resources, and context-switching overhead, but can 195 * lead to artificially low throughput. If tasks frequently block (for 196 * example if they are I/O bound), a system may be able to schedule 197 * time for more threads than you otherwise allow. Use of small queues 198 * generally requires larger pool sizes, which keeps CPUs busier but 199 * may encounter unacceptable scheduling overhead, which also 200 * decreases throughput. 201 * 202 * </ol> 203 * 204 * </dd> 205 * 206 * <dt>Rejected tasks</dt> 207 * 208 * <dd>New tasks submitted in method {@link #execute(Runnable)} will be 209 * <em>rejected</em> when the Executor has been shut down, and also when 210 * the Executor uses finite bounds for both maximum threads and work queue 211 * capacity, and is saturated. In either case, the {@code execute} method 212 * invokes the {@link 213 * RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)} 214 * method of its {@link RejectedExecutionHandler}. Four predefined handler 215 * policies are provided: 216 * 217 * <ol> 218 * 219 * <li>In the default {@link ThreadPoolExecutor.AbortPolicy}, the handler 220 * throws a runtime {@link RejectedExecutionException} upon rejection. 221 * 222 * <li>In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread 223 * that invokes {@code execute} itself runs the task. This provides a 224 * simple feedback control mechanism that will slow down the rate that 225 * new tasks are submitted. 226 * 227 * <li>In {@link ThreadPoolExecutor.DiscardPolicy}, a task that 228 * cannot be executed is simply dropped. 229 * 230 * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the 231 * executor is not shut down, the task at the head of the work queue 232 * is dropped, and then execution is retried (which can fail again, 233 * causing this to be repeated.) 234 * 235 * </ol> 236 * 237 * It is possible to define and use other kinds of {@link 238 * RejectedExecutionHandler} classes. Doing so requires some care 239 * especially when policies are designed to work only under particular 240 * capacity or queuing policies. </dd> 241 * 242 * <dt>Hook methods</dt> 243 * 244 * <dd>This class provides {@code protected} overridable 245 * {@link #beforeExecute(Thread, Runnable)} and 246 * {@link #afterExecute(Runnable, Throwable)} methods that are called 247 * before and after execution of each task. These can be used to 248 * manipulate the execution environment; for example, reinitializing 249 * ThreadLocals, gathering statistics, or adding log entries. 250 * Additionally, method {@link #terminated} can be overridden to perform 251 * any special processing that needs to be done once the Executor has 252 * fully terminated. 253 * 254 * <p>If hook, callback, or BlockingQueue methods throw exceptions, 255 * internal worker threads may in turn fail, abruptly terminate, and 256 * possibly be replaced.</dd> 257 * 258 * <dt>Queue maintenance</dt> 259 * 260 * <dd>Method {@link #getQueue()} allows access to the work queue 261 * for purposes of monitoring and debugging. Use of this method for 262 * any other purpose is strongly discouraged. Two supplied methods, 263 * {@link #remove(Runnable)} and {@link #purge} are available to 264 * assist in storage reclamation when large numbers of queued tasks 265 * become cancelled.</dd> 266 * 267 * <dt>Finalization</dt> 268 * 269 * <dd>A pool that is no longer referenced in a program <em>AND</em> 270 * has no remaining threads will be {@code shutdown} automatically. If 271 * you would like to ensure that unreferenced pools are reclaimed even 272 * if users forget to call {@link #shutdown}, then you must arrange 273 * that unused threads eventually die, by setting appropriate 274 * keep-alive times, using a lower bound of zero core threads and/or 275 * setting {@link #allowCoreThreadTimeOut(boolean)}. </dd> 276 * 277 * </dl> 278 * 279 * <p><b>Extension example</b>. Most extensions of this class 280 * override one or more of the protected hook methods. For example, 281 * here is a subclass that adds a simple pause/resume feature: 282 * 283 * <pre> {@code 284 * class PausableThreadPoolExecutor extends ThreadPoolExecutor { 285 * private boolean isPaused; 286 * private ReentrantLock pauseLock = new ReentrantLock(); 287 * private Condition unpaused = pauseLock.newCondition(); 288 * 289 * public PausableThreadPoolExecutor(...) { super(...); } 290 * 291 * protected void beforeExecute(Thread t, Runnable r) { 292 * super.beforeExecute(t, r); 293 * pauseLock.lock(); 294 * try { 295 * while (isPaused) unpaused.await(); 296 * } catch (InterruptedException ie) { 297 * t.interrupt(); 298 * } finally { 299 * pauseLock.unlock(); 300 * } 301 * } 302 * 303 * public void pause() { 304 * pauseLock.lock(); 305 * try { 306 * isPaused = true; 307 * } finally { 308 * pauseLock.unlock(); 309 * } 310 * } 311 * 312 * public void resume() { 313 * pauseLock.lock(); 314 * try { 315 * isPaused = false; 316 * unpaused.signalAll(); 317 * } finally { 318 * pauseLock.unlock(); 319 * } 320 * } 321 * }}</pre> 322 * 323 * @since 1.5 324 * @author Doug Lea 325 */ 326public class ThreadPoolExecutor extends AbstractExecutorService { 327 /** 328 * The main pool control state, ctl, is an atomic integer packing 329 * two conceptual fields 330 * workerCount, indicating the effective number of threads 331 * runState, indicating whether running, shutting down etc 332 * 333 * In order to pack them into one int, we limit workerCount to 334 * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 335 * billion) otherwise representable. If this is ever an issue in 336 * the future, the variable can be changed to be an AtomicLong, 337 * and the shift/mask constants below adjusted. But until the need 338 * arises, this code is a bit faster and simpler using an int. 339 * 340 * The workerCount is the number of workers that have been 341 * permitted to start and not permitted to stop. The value may be 342 * transiently different from the actual number of live threads, 343 * for example when a ThreadFactory fails to create a thread when 344 * asked, and when exiting threads are still performing 345 * bookkeeping before terminating. The user-visible pool size is 346 * reported as the current size of the workers set. 347 * 348 * The runState provides the main lifecycle control, taking on values: 349 * 350 * RUNNING: Accept new tasks and process queued tasks 351 * SHUTDOWN: Don't accept new tasks, but process queued tasks 352 * STOP: Don't accept new tasks, don't process queued tasks, 353 * and interrupt in-progress tasks 354 * TIDYING: All tasks have terminated, workerCount is zero, 355 * the thread transitioning to state TIDYING 356 * will run the terminated() hook method 357 * TERMINATED: terminated() has completed 358 * 359 * The numerical order among these values matters, to allow 360 * ordered comparisons. The runState monotonically increases over 361 * time, but need not hit each state. The transitions are: 362 * 363 * RUNNING -> SHUTDOWN 364 * On invocation of shutdown(), perhaps implicitly in finalize() 365 * (RUNNING or SHUTDOWN) -> STOP 366 * On invocation of shutdownNow() 367 * SHUTDOWN -> TIDYING 368 * When both queue and pool are empty 369 * STOP -> TIDYING 370 * When pool is empty 371 * TIDYING -> TERMINATED 372 * When the terminated() hook method has completed 373 * 374 * Threads waiting in awaitTermination() will return when the 375 * state reaches TERMINATED. 376 * 377 * Detecting the transition from SHUTDOWN to TIDYING is less 378 * straightforward than you'd like because the queue may become 379 * empty after non-empty and vice versa during SHUTDOWN state, but 380 * we can only terminate if, after seeing that it is empty, we see 381 * that workerCount is 0 (which sometimes entails a recheck -- see 382 * below). 383 */ 384 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 385 private static final int COUNT_BITS = Integer.SIZE - 3; 386 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 387 388 // runState is stored in the high-order bits 389 private static final int RUNNING = -1 << COUNT_BITS; 390 private static final int SHUTDOWN = 0 << COUNT_BITS; 391 private static final int STOP = 1 << COUNT_BITS; 392 private static final int TIDYING = 2 << COUNT_BITS; 393 private static final int TERMINATED = 3 << COUNT_BITS; 394 395 // Packing and unpacking ctl 396 private static int runStateOf(int c) { return c & ~CAPACITY; } 397 private static int workerCountOf(int c) { return c & CAPACITY; } 398 private static int ctlOf(int rs, int wc) { return rs | wc; } 399 400 /* 401 * Bit field accessors that don't require unpacking ctl. 402 * These depend on the bit layout and on workerCount being never negative. 403 */ 404 405 private static boolean runStateLessThan(int c, int s) { 406 return c < s; 407 } 408 409 private static boolean runStateAtLeast(int c, int s) { 410 return c >= s; 411 } 412 413 private static boolean isRunning(int c) { 414 return c < SHUTDOWN; 415 } 416 417 /** 418 * Attempts to CAS-increment the workerCount field of ctl. 419 */ 420 private boolean compareAndIncrementWorkerCount(int expect) { 421 return ctl.compareAndSet(expect, expect + 1); 422 } 423 424 /** 425 * Attempts to CAS-decrement the workerCount field of ctl. 426 */ 427 private boolean compareAndDecrementWorkerCount(int expect) { 428 return ctl.compareAndSet(expect, expect - 1); 429 } 430 431 /** 432 * Decrements the workerCount field of ctl. This is called only on 433 * abrupt termination of a thread (see processWorkerExit). Other 434 * decrements are performed within getTask. 435 */ 436 private void decrementWorkerCount() { 437 do {} while (! compareAndDecrementWorkerCount(ctl.get())); 438 } 439 440 /** 441 * The queue used for holding tasks and handing off to worker 442 * threads. We do not require that workQueue.poll() returning 443 * null necessarily means that workQueue.isEmpty(), so rely 444 * solely on isEmpty to see if the queue is empty (which we must 445 * do for example when deciding whether to transition from 446 * SHUTDOWN to TIDYING). This accommodates special-purpose 447 * queues such as DelayQueues for which poll() is allowed to 448 * return null even if it may later return non-null when delays 449 * expire. 450 */ 451 private final BlockingQueue<Runnable> workQueue; 452 453 /** 454 * Lock held on access to workers set and related bookkeeping. 455 * While we could use a concurrent set of some sort, it turns out 456 * to be generally preferable to use a lock. Among the reasons is 457 * that this serializes interruptIdleWorkers, which avoids 458 * unnecessary interrupt storms, especially during shutdown. 459 * Otherwise exiting threads would concurrently interrupt those 460 * that have not yet interrupted. It also simplifies some of the 461 * associated statistics bookkeeping of largestPoolSize etc. We 462 * also hold mainLock on shutdown and shutdownNow, for the sake of 463 * ensuring workers set is stable while separately checking 464 * permission to interrupt and actually interrupting. 465 */ 466 private final ReentrantLock mainLock = new ReentrantLock(); 467 468 /** 469 * Set containing all worker threads in pool. Accessed only when 470 * holding mainLock. 471 */ 472 private final HashSet<Worker> workers = new HashSet<>(); 473 474 /** 475 * Wait condition to support awaitTermination. 476 */ 477 private final Condition termination = mainLock.newCondition(); 478 479 /** 480 * Tracks largest attained pool size. Accessed only under 481 * mainLock. 482 */ 483 private int largestPoolSize; 484 485 /** 486 * Counter for completed tasks. Updated only on termination of 487 * worker threads. Accessed only under mainLock. 488 */ 489 private long completedTaskCount; 490 491 /* 492 * All user control parameters are declared as volatiles so that 493 * ongoing actions are based on freshest values, but without need 494 * for locking, since no internal invariants depend on them 495 * changing synchronously with respect to other actions. 496 */ 497 498 /** 499 * Factory for new threads. All threads are created using this 500 * factory (via method addWorker). All callers must be prepared 501 * for addWorker to fail, which may reflect a system or user's 502 * policy limiting the number of threads. Even though it is not 503 * treated as an error, failure to create threads may result in 504 * new tasks being rejected or existing ones remaining stuck in 505 * the queue. 506 * 507 * We go further and preserve pool invariants even in the face of 508 * errors such as OutOfMemoryError, that might be thrown while 509 * trying to create threads. Such errors are rather common due to 510 * the need to allocate a native stack in Thread.start, and users 511 * will want to perform clean pool shutdown to clean up. There 512 * will likely be enough memory available for the cleanup code to 513 * complete without encountering yet another OutOfMemoryError. 514 */ 515 private volatile ThreadFactory threadFactory; 516 517 /** 518 * Handler called when saturated or shutdown in execute. 519 */ 520 private volatile RejectedExecutionHandler handler; 521 522 /** 523 * Timeout in nanoseconds for idle threads waiting for work. 524 * Threads use this timeout when there are more than corePoolSize 525 * present or if allowCoreThreadTimeOut. Otherwise they wait 526 * forever for new work. 527 */ 528 private volatile long keepAliveTime; 529 530 /** 531 * If false (default), core threads stay alive even when idle. 532 * If true, core threads use keepAliveTime to time out waiting 533 * for work. 534 */ 535 private volatile boolean allowCoreThreadTimeOut; 536 537 /** 538 * Core pool size is the minimum number of workers to keep alive 539 * (and not allow to time out etc) unless allowCoreThreadTimeOut 540 * is set, in which case the minimum is zero. 541 */ 542 private volatile int corePoolSize; 543 544 /** 545 * Maximum pool size. Note that the actual maximum is internally 546 * bounded by CAPACITY. 547 */ 548 private volatile int maximumPoolSize; 549 550 /** 551 * The default rejected execution handler. 552 */ 553 private static final RejectedExecutionHandler defaultHandler = 554 new AbortPolicy(); 555 556 /** 557 * Permission required for callers of shutdown and shutdownNow. 558 * We additionally require (see checkShutdownAccess) that callers 559 * have permission to actually interrupt threads in the worker set 560 * (as governed by Thread.interrupt, which relies on 561 * ThreadGroup.checkAccess, which in turn relies on 562 * SecurityManager.checkAccess). Shutdowns are attempted only if 563 * these checks pass. 564 * 565 * All actual invocations of Thread.interrupt (see 566 * interruptIdleWorkers and interruptWorkers) ignore 567 * SecurityExceptions, meaning that the attempted interrupts 568 * silently fail. In the case of shutdown, they should not fail 569 * unless the SecurityManager has inconsistent policies, sometimes 570 * allowing access to a thread and sometimes not. In such cases, 571 * failure to actually interrupt threads may disable or delay full 572 * termination. Other uses of interruptIdleWorkers are advisory, 573 * and failure to actually interrupt will merely delay response to 574 * configuration changes so is not handled exceptionally. 575 */ 576 private static final RuntimePermission shutdownPerm = 577 new RuntimePermission("modifyThread"); 578 579 /** The context to be used when executing the finalizer, or null. */ 580 private final AccessControlContext acc; 581 582 /** 583 * Class Worker mainly maintains interrupt control state for 584 * threads running tasks, along with other minor bookkeeping. 585 * This class opportunistically extends AbstractQueuedSynchronizer 586 * to simplify acquiring and releasing a lock surrounding each 587 * task execution. This protects against interrupts that are 588 * intended to wake up a worker thread waiting for a task from 589 * instead interrupting a task being run. We implement a simple 590 * non-reentrant mutual exclusion lock rather than use 591 * ReentrantLock because we do not want worker tasks to be able to 592 * reacquire the lock when they invoke pool control methods like 593 * setCorePoolSize. Additionally, to suppress interrupts until 594 * the thread actually starts running tasks, we initialize lock 595 * state to a negative value, and clear it upon start (in 596 * runWorker). 597 */ 598 private final class Worker 599 extends AbstractQueuedSynchronizer 600 implements Runnable 601 { 602 /** 603 * This class will never be serialized, but we provide a 604 * serialVersionUID to suppress a javac warning. 605 */ 606 private static final long serialVersionUID = 6138294804551838833L; 607 608 /** Thread this worker is running in. Null if factory fails. */ 609 final Thread thread; 610 /** Initial task to run. Possibly null. */ 611 Runnable firstTask; 612 /** Per-thread task counter */ 613 volatile long completedTasks; 614 615 // TODO: switch to AbstractQueuedLongSynchronizer and move 616 // completedTasks into the lock word. 617 618 /** 619 * Creates with given first task and thread from ThreadFactory. 620 * @param firstTask the first task (null if none) 621 */ 622 Worker(Runnable firstTask) { 623 setState(-1); // inhibit interrupts until runWorker 624 this.firstTask = firstTask; 625 this.thread = getThreadFactory().newThread(this); 626 } 627 628 /** Delegates main run loop to outer runWorker. */ 629 public void run() { 630 runWorker(this); 631 } 632 633 // Lock methods 634 // 635 // The value 0 represents the unlocked state. 636 // The value 1 represents the locked state. 637 638 protected boolean isHeldExclusively() { 639 return getState() != 0; 640 } 641 642 protected boolean tryAcquire(int unused) { 643 if (compareAndSetState(0, 1)) { 644 setExclusiveOwnerThread(Thread.currentThread()); 645 return true; 646 } 647 return false; 648 } 649 650 protected boolean tryRelease(int unused) { 651 setExclusiveOwnerThread(null); 652 setState(0); 653 return true; 654 } 655 656 public void lock() { acquire(1); } 657 public boolean tryLock() { return tryAcquire(1); } 658 public void unlock() { release(1); } 659 public boolean isLocked() { return isHeldExclusively(); } 660 661 void interruptIfStarted() { 662 Thread t; 663 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { 664 try { 665 t.interrupt(); 666 } catch (SecurityException ignore) { 667 } 668 } 669 } 670 } 671 672 /* 673 * Methods for setting control state 674 */ 675 676 /** 677 * Transitions runState to given target, or leaves it alone if 678 * already at least the given target. 679 * 680 * @param targetState the desired state, either SHUTDOWN or STOP 681 * (but not TIDYING or TERMINATED -- use tryTerminate for that) 682 */ 683 private void advanceRunState(int targetState) { 684 // assert targetState == SHUTDOWN || targetState == STOP; 685 for (;;) { 686 int c = ctl.get(); 687 if (runStateAtLeast(c, targetState) || 688 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) 689 break; 690 } 691 } 692 693 /** 694 * Transitions to TERMINATED state if either (SHUTDOWN and pool 695 * and queue empty) or (STOP and pool empty). If otherwise 696 * eligible to terminate but workerCount is nonzero, interrupts an 697 * idle worker to ensure that shutdown signals propagate. This 698 * method must be called following any action that might make 699 * termination possible -- reducing worker count or removing tasks 700 * from the queue during shutdown. The method is non-private to 701 * allow access from ScheduledThreadPoolExecutor. 702 */ 703 final void tryTerminate() { 704 for (;;) { 705 int c = ctl.get(); 706 if (isRunning(c) || 707 runStateAtLeast(c, TIDYING) || 708 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) 709 return; 710 if (workerCountOf(c) != 0) { // Eligible to terminate 711 interruptIdleWorkers(ONLY_ONE); 712 return; 713 } 714 715 final ReentrantLock mainLock = this.mainLock; 716 mainLock.lock(); 717 try { 718 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { 719 try { 720 terminated(); 721 } finally { 722 ctl.set(ctlOf(TERMINATED, 0)); 723 termination.signalAll(); 724 } 725 return; 726 } 727 } finally { 728 mainLock.unlock(); 729 } 730 // else retry on failed CAS 731 } 732 } 733 734 /* 735 * Methods for controlling interrupts to worker threads. 736 */ 737 738 /** 739 * If there is a security manager, makes sure caller has 740 * permission to shut down threads in general (see shutdownPerm). 741 * If this passes, additionally makes sure the caller is allowed 742 * to interrupt each worker thread. This might not be true even if 743 * first check passed, if the SecurityManager treats some threads 744 * specially. 745 */ 746 private void checkShutdownAccess() { 747 SecurityManager security = System.getSecurityManager(); 748 if (security != null) { 749 security.checkPermission(shutdownPerm); 750 final ReentrantLock mainLock = this.mainLock; 751 mainLock.lock(); 752 try { 753 for (Worker w : workers) 754 security.checkAccess(w.thread); 755 } finally { 756 mainLock.unlock(); 757 } 758 } 759 } 760 761 /** 762 * Interrupts all threads, even if active. Ignores SecurityExceptions 763 * (in which case some threads may remain uninterrupted). 764 */ 765 private void interruptWorkers() { 766 final ReentrantLock mainLock = this.mainLock; 767 mainLock.lock(); 768 try { 769 for (Worker w : workers) 770 w.interruptIfStarted(); 771 } finally { 772 mainLock.unlock(); 773 } 774 } 775 776 /** 777 * Interrupts threads that might be waiting for tasks (as 778 * indicated by not being locked) so they can check for 779 * termination or configuration changes. Ignores 780 * SecurityExceptions (in which case some threads may remain 781 * uninterrupted). 782 * 783 * @param onlyOne If true, interrupt at most one worker. This is 784 * called only from tryTerminate when termination is otherwise 785 * enabled but there are still other workers. In this case, at 786 * most one waiting worker is interrupted to propagate shutdown 787 * signals in case all threads are currently waiting. 788 * Interrupting any arbitrary thread ensures that newly arriving 789 * workers since shutdown began will also eventually exit. 790 * To guarantee eventual termination, it suffices to always 791 * interrupt only one idle worker, but shutdown() interrupts all 792 * idle workers so that redundant workers exit promptly, not 793 * waiting for a straggler task to finish. 794 */ 795 private void interruptIdleWorkers(boolean onlyOne) { 796 final ReentrantLock mainLock = this.mainLock; 797 mainLock.lock(); 798 try { 799 for (Worker w : workers) { 800 Thread t = w.thread; 801 if (!t.isInterrupted() && w.tryLock()) { 802 try { 803 t.interrupt(); 804 } catch (SecurityException ignore) { 805 } finally { 806 w.unlock(); 807 } 808 } 809 if (onlyOne) 810 break; 811 } 812 } finally { 813 mainLock.unlock(); 814 } 815 } 816 817 /** 818 * Common form of interruptIdleWorkers, to avoid having to 819 * remember what the boolean argument means. 820 */ 821 private void interruptIdleWorkers() { 822 interruptIdleWorkers(false); 823 } 824 825 private static final boolean ONLY_ONE = true; 826 827 /* 828 * Misc utilities, most of which are also exported to 829 * ScheduledThreadPoolExecutor 830 */ 831 832 /** 833 * Invokes the rejected execution handler for the given command. 834 * Package-protected for use by ScheduledThreadPoolExecutor. 835 */ 836 final void reject(Runnable command) { 837 handler.rejectedExecution(command, this); 838 } 839 840 /** 841 * Performs any further cleanup following run state transition on 842 * invocation of shutdown. A no-op here, but used by 843 * ScheduledThreadPoolExecutor to cancel delayed tasks. 844 */ 845 void onShutdown() { 846 } 847 848 /** 849 * Drains the task queue into a new list, normally using 850 * drainTo. But if the queue is a DelayQueue or any other kind of 851 * queue for which poll or drainTo may fail to remove some 852 * elements, it deletes them one by one. 853 */ 854 private List<Runnable> drainQueue() { 855 BlockingQueue<Runnable> q = workQueue; 856 ArrayList<Runnable> taskList = new ArrayList<>(); 857 q.drainTo(taskList); 858 if (!q.isEmpty()) { 859 for (Runnable r : q.toArray(new Runnable[0])) { 860 if (q.remove(r)) 861 taskList.add(r); 862 } 863 } 864 return taskList; 865 } 866 867 /* 868 * Methods for creating, running and cleaning up after workers 869 */ 870 871 /** 872 * Checks if a new worker can be added with respect to current 873 * pool state and the given bound (either core or maximum). If so, 874 * the worker count is adjusted accordingly, and, if possible, a 875 * new worker is created and started, running firstTask as its 876 * first task. This method returns false if the pool is stopped or 877 * eligible to shut down. It also returns false if the thread 878 * factory fails to create a thread when asked. If the thread 879 * creation fails, either due to the thread factory returning 880 * null, or due to an exception (typically OutOfMemoryError in 881 * Thread.start()), we roll back cleanly. 882 * 883 * @param firstTask the task the new thread should run first (or 884 * null if none). Workers are created with an initial first task 885 * (in method execute()) to bypass queuing when there are fewer 886 * than corePoolSize threads (in which case we always start one), 887 * or when the queue is full (in which case we must bypass queue). 888 * Initially idle threads are usually created via 889 * prestartCoreThread or to replace other dying workers. 890 * 891 * @param core if true use corePoolSize as bound, else 892 * maximumPoolSize. (A boolean indicator is used here rather than a 893 * value to ensure reads of fresh values after checking other pool 894 * state). 895 * @return true if successful 896 */ 897 private boolean addWorker(Runnable firstTask, boolean core) { 898 retry: 899 for (;;) { 900 int c = ctl.get(); 901 int rs = runStateOf(c); 902 903 // Check if queue empty only if necessary. 904 if (rs >= SHUTDOWN && 905 ! (rs == SHUTDOWN && 906 firstTask == null && 907 ! workQueue.isEmpty())) 908 return false; 909 910 for (;;) { 911 int wc = workerCountOf(c); 912 if (wc >= CAPACITY || 913 wc >= (core ? corePoolSize : maximumPoolSize)) 914 return false; 915 if (compareAndIncrementWorkerCount(c)) 916 break retry; 917 c = ctl.get(); // Re-read ctl 918 if (runStateOf(c) != rs) 919 continue retry; 920 // else CAS failed due to workerCount change; retry inner loop 921 } 922 } 923 924 boolean workerStarted = false; 925 boolean workerAdded = false; 926 Worker w = null; 927 try { 928 w = new Worker(firstTask); 929 final Thread t = w.thread; 930 if (t != null) { 931 final ReentrantLock mainLock = this.mainLock; 932 mainLock.lock(); 933 try { 934 // Recheck while holding lock. 935 // Back out on ThreadFactory failure or if 936 // shut down before lock acquired. 937 int rs = runStateOf(ctl.get()); 938 939 if (rs < SHUTDOWN || 940 (rs == SHUTDOWN && firstTask == null)) { 941 if (t.isAlive()) // precheck that t is startable 942 throw new IllegalThreadStateException(); 943 workers.add(w); 944 int s = workers.size(); 945 if (s > largestPoolSize) 946 largestPoolSize = s; 947 workerAdded = true; 948 } 949 } finally { 950 mainLock.unlock(); 951 } 952 if (workerAdded) { 953 t.start(); 954 workerStarted = true; 955 } 956 } 957 } finally { 958 if (! workerStarted) 959 addWorkerFailed(w); 960 } 961 return workerStarted; 962 } 963 964 /** 965 * Rolls back the worker thread creation. 966 * - removes worker from workers, if present 967 * - decrements worker count 968 * - rechecks for termination, in case the existence of this 969 * worker was holding up termination 970 */ 971 private void addWorkerFailed(Worker w) { 972 final ReentrantLock mainLock = this.mainLock; 973 mainLock.lock(); 974 try { 975 if (w != null) 976 workers.remove(w); 977 decrementWorkerCount(); 978 tryTerminate(); 979 } finally { 980 mainLock.unlock(); 981 } 982 } 983 984 /** 985 * Performs cleanup and bookkeeping for a dying worker. Called 986 * only from worker threads. Unless completedAbruptly is set, 987 * assumes that workerCount has already been adjusted to account 988 * for exit. This method removes thread from worker set, and 989 * possibly terminates the pool or replaces the worker if either 990 * it exited due to user task exception or if fewer than 991 * corePoolSize workers are running or queue is non-empty but 992 * there are no workers. 993 * 994 * @param w the worker 995 * @param completedAbruptly if the worker died due to user exception 996 */ 997 private void processWorkerExit(Worker w, boolean completedAbruptly) { 998 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 999 decrementWorkerCount(); 1000 1001 final ReentrantLock mainLock = this.mainLock; 1002 mainLock.lock(); 1003 try { 1004 completedTaskCount += w.completedTasks; 1005 workers.remove(w); 1006 } finally { 1007 mainLock.unlock(); 1008 } 1009 1010 tryTerminate(); 1011 1012 int c = ctl.get(); 1013 if (runStateLessThan(c, STOP)) { 1014 if (!completedAbruptly) { 1015 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; 1016 if (min == 0 && ! workQueue.isEmpty()) 1017 min = 1; 1018 if (workerCountOf(c) >= min) 1019 return; // replacement not needed 1020 } 1021 addWorker(null, false); 1022 } 1023 } 1024 1025 /** 1026 * Performs blocking or timed wait for a task, depending on 1027 * current configuration settings, or returns null if this worker 1028 * must exit because of any of: 1029 * 1. There are more than maximumPoolSize workers (due to 1030 * a call to setMaximumPoolSize). 1031 * 2. The pool is stopped. 1032 * 3. The pool is shutdown and the queue is empty. 1033 * 4. This worker timed out waiting for a task, and timed-out 1034 * workers are subject to termination (that is, 1035 * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) 1036 * both before and after the timed wait, and if the queue is 1037 * non-empty, this worker is not the last thread in the pool. 1038 * 1039 * @return task, or null if the worker must exit, in which case 1040 * workerCount is decremented 1041 */ 1042 private Runnable getTask() { 1043 boolean timedOut = false; // Did the last poll() time out? 1044 1045 for (;;) { 1046 int c = ctl.get(); 1047 int rs = runStateOf(c); 1048 1049 // Check if queue empty only if necessary. 1050 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 1051 decrementWorkerCount(); 1052 return null; 1053 } 1054 1055 int wc = workerCountOf(c); 1056 1057 // Are workers subject to culling? 1058 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 1059 1060 if ((wc > maximumPoolSize || (timed && timedOut)) 1061 && (wc > 1 || workQueue.isEmpty())) { 1062 if (compareAndDecrementWorkerCount(c)) 1063 return null; 1064 continue; 1065 } 1066 1067 try { 1068 Runnable r = timed ? 1069 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 1070 workQueue.take(); 1071 if (r != null) 1072 return r; 1073 timedOut = true; 1074 } catch (InterruptedException retry) { 1075 timedOut = false; 1076 } 1077 } 1078 } 1079 1080 /** 1081 * Main worker run loop. Repeatedly gets tasks from queue and 1082 * executes them, while coping with a number of issues: 1083 * 1084 * 1. We may start out with an initial task, in which case we 1085 * don't need to get the first one. Otherwise, as long as pool is 1086 * running, we get tasks from getTask. If it returns null then the 1087 * worker exits due to changed pool state or configuration 1088 * parameters. Other exits result from exception throws in 1089 * external code, in which case completedAbruptly holds, which 1090 * usually leads processWorkerExit to replace this thread. 1091 * 1092 * 2. Before running any task, the lock is acquired to prevent 1093 * other pool interrupts while the task is executing, and then we 1094 * ensure that unless pool is stopping, this thread does not have 1095 * its interrupt set. 1096 * 1097 * 3. Each task run is preceded by a call to beforeExecute, which 1098 * might throw an exception, in which case we cause thread to die 1099 * (breaking loop with completedAbruptly true) without processing 1100 * the task. 1101 * 1102 * 4. Assuming beforeExecute completes normally, we run the task, 1103 * gathering any of its thrown exceptions to send to afterExecute. 1104 * We separately handle RuntimeException, Error (both of which the 1105 * specs guarantee that we trap) and arbitrary Throwables. 1106 * Because we cannot rethrow Throwables within Runnable.run, we 1107 * wrap them within Errors on the way out (to the thread's 1108 * UncaughtExceptionHandler). Any thrown exception also 1109 * conservatively causes thread to die. 1110 * 1111 * 5. After task.run completes, we call afterExecute, which may 1112 * also throw an exception, which will also cause thread to 1113 * die. According to JLS Sec 14.20, this exception is the one that 1114 * will be in effect even if task.run throws. 1115 * 1116 * The net effect of the exception mechanics is that afterExecute 1117 * and the thread's UncaughtExceptionHandler have as accurate 1118 * information as we can provide about any problems encountered by 1119 * user code. 1120 * 1121 * @param w the worker 1122 */ 1123 final void runWorker(Worker w) { 1124 Thread wt = Thread.currentThread(); 1125 Runnable task = w.firstTask; 1126 w.firstTask = null; 1127 w.unlock(); // allow interrupts 1128 boolean completedAbruptly = true; 1129 try { 1130 while (task != null || (task = getTask()) != null) { 1131 w.lock(); 1132 // If pool is stopping, ensure thread is interrupted; 1133 // if not, ensure thread is not interrupted. This 1134 // requires a recheck in second case to deal with 1135 // shutdownNow race while clearing interrupt 1136 if ((runStateAtLeast(ctl.get(), STOP) || 1137 (Thread.interrupted() && 1138 runStateAtLeast(ctl.get(), STOP))) && 1139 !wt.isInterrupted()) 1140 wt.interrupt(); 1141 try { 1142 beforeExecute(wt, task); 1143 Throwable thrown = null; 1144 try { 1145 task.run(); 1146 } catch (RuntimeException x) { 1147 thrown = x; throw x; 1148 } catch (Error x) { 1149 thrown = x; throw x; 1150 } catch (Throwable x) { 1151 thrown = x; throw new Error(x); 1152 } finally { 1153 afterExecute(task, thrown); 1154 } 1155 } finally { 1156 task = null; 1157 w.completedTasks++; 1158 w.unlock(); 1159 } 1160 } 1161 completedAbruptly = false; 1162 } finally { 1163 processWorkerExit(w, completedAbruptly); 1164 } 1165 } 1166 1167 // Public constructors and methods 1168 1169 /** 1170 * Creates a new {@code ThreadPoolExecutor} with the given initial 1171 * parameters, the default thread factory and the default rejected 1172 * execution handler. 1173 * 1174 * <p>It may be more convenient to use one of the {@link Executors} 1175 * factory methods instead of this general purpose constructor. 1176 * 1177 * @param corePoolSize the number of threads to keep in the pool, even 1178 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1179 * @param maximumPoolSize the maximum number of threads to allow in the 1180 * pool 1181 * @param keepAliveTime when the number of threads is greater than 1182 * the core, this is the maximum time that excess idle threads 1183 * will wait for new tasks before terminating. 1184 * @param unit the time unit for the {@code keepAliveTime} argument 1185 * @param workQueue the queue to use for holding tasks before they are 1186 * executed. This queue will hold only the {@code Runnable} 1187 * tasks submitted by the {@code execute} method. 1188 * @throws IllegalArgumentException if one of the following holds:<br> 1189 * {@code corePoolSize < 0}<br> 1190 * {@code keepAliveTime < 0}<br> 1191 * {@code maximumPoolSize <= 0}<br> 1192 * {@code maximumPoolSize < corePoolSize} 1193 * @throws NullPointerException if {@code workQueue} is null 1194 */ 1195 public ThreadPoolExecutor(int corePoolSize, 1196 int maximumPoolSize, 1197 long keepAliveTime, 1198 TimeUnit unit, 1199 BlockingQueue<Runnable> workQueue) { 1200 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1201 Executors.defaultThreadFactory(), defaultHandler); 1202 } 1203 1204 /** 1205 * Creates a new {@code ThreadPoolExecutor} with the given initial 1206 * parameters and {@linkplain ThreadPoolExecutor.AbortPolicy 1207 * default rejected execution handler}. 1208 * 1209 * @param corePoolSize the number of threads to keep in the pool, even 1210 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1211 * @param maximumPoolSize the maximum number of threads to allow in the 1212 * pool 1213 * @param keepAliveTime when the number of threads is greater than 1214 * the core, this is the maximum time that excess idle threads 1215 * will wait for new tasks before terminating. 1216 * @param unit the time unit for the {@code keepAliveTime} argument 1217 * @param workQueue the queue to use for holding tasks before they are 1218 * executed. This queue will hold only the {@code Runnable} 1219 * tasks submitted by the {@code execute} method. 1220 * @param threadFactory the factory to use when the executor 1221 * creates a new thread 1222 * @throws IllegalArgumentException if one of the following holds:<br> 1223 * {@code corePoolSize < 0}<br> 1224 * {@code keepAliveTime < 0}<br> 1225 * {@code maximumPoolSize <= 0}<br> 1226 * {@code maximumPoolSize < corePoolSize} 1227 * @throws NullPointerException if {@code workQueue} 1228 * or {@code threadFactory} is null 1229 */ 1230 public ThreadPoolExecutor(int corePoolSize, 1231 int maximumPoolSize, 1232 long keepAliveTime, 1233 TimeUnit unit, 1234 BlockingQueue<Runnable> workQueue, 1235 ThreadFactory threadFactory) { 1236 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1237 threadFactory, defaultHandler); 1238 } 1239 1240 /** 1241 * Creates a new {@code ThreadPoolExecutor} with the given initial 1242 * parameters and 1243 * {@linkplain Executors#defaultThreadFactory default thread factory}. 1244 * 1245 * @param corePoolSize the number of threads to keep in the pool, even 1246 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1247 * @param maximumPoolSize the maximum number of threads to allow in the 1248 * pool 1249 * @param keepAliveTime when the number of threads is greater than 1250 * the core, this is the maximum time that excess idle threads 1251 * will wait for new tasks before terminating. 1252 * @param unit the time unit for the {@code keepAliveTime} argument 1253 * @param workQueue the queue to use for holding tasks before they are 1254 * executed. This queue will hold only the {@code Runnable} 1255 * tasks submitted by the {@code execute} method. 1256 * @param handler the handler to use when execution is blocked 1257 * because the thread bounds and queue capacities are reached 1258 * @throws IllegalArgumentException if one of the following holds:<br> 1259 * {@code corePoolSize < 0}<br> 1260 * {@code keepAliveTime < 0}<br> 1261 * {@code maximumPoolSize <= 0}<br> 1262 * {@code maximumPoolSize < corePoolSize} 1263 * @throws NullPointerException if {@code workQueue} 1264 * or {@code handler} is null 1265 */ 1266 public ThreadPoolExecutor(int corePoolSize, 1267 int maximumPoolSize, 1268 long keepAliveTime, 1269 TimeUnit unit, 1270 BlockingQueue<Runnable> workQueue, 1271 RejectedExecutionHandler handler) { 1272 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1273 Executors.defaultThreadFactory(), handler); 1274 } 1275 1276 /** 1277 * Creates a new {@code ThreadPoolExecutor} with the given initial 1278 * parameters. 1279 * 1280 * @param corePoolSize the number of threads to keep in the pool, even 1281 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1282 * @param maximumPoolSize the maximum number of threads to allow in the 1283 * pool 1284 * @param keepAliveTime when the number of threads is greater than 1285 * the core, this is the maximum time that excess idle threads 1286 * will wait for new tasks before terminating. 1287 * @param unit the time unit for the {@code keepAliveTime} argument 1288 * @param workQueue the queue to use for holding tasks before they are 1289 * executed. This queue will hold only the {@code Runnable} 1290 * tasks submitted by the {@code execute} method. 1291 * @param threadFactory the factory to use when the executor 1292 * creates a new thread 1293 * @param handler the handler to use when execution is blocked 1294 * because the thread bounds and queue capacities are reached 1295 * @throws IllegalArgumentException if one of the following holds:<br> 1296 * {@code corePoolSize < 0}<br> 1297 * {@code keepAliveTime < 0}<br> 1298 * {@code maximumPoolSize <= 0}<br> 1299 * {@code maximumPoolSize < corePoolSize} 1300 * @throws NullPointerException if {@code workQueue} 1301 * or {@code threadFactory} or {@code handler} is null 1302 */ 1303 public ThreadPoolExecutor(int corePoolSize, 1304 int maximumPoolSize, 1305 long keepAliveTime, 1306 TimeUnit unit, 1307 BlockingQueue<Runnable> workQueue, 1308 ThreadFactory threadFactory, 1309 RejectedExecutionHandler handler) { 1310 if (corePoolSize < 0 || 1311 maximumPoolSize <= 0 || 1312 maximumPoolSize < corePoolSize || 1313 keepAliveTime < 0) 1314 throw new IllegalArgumentException(); 1315 if (workQueue == null || threadFactory == null || handler == null) 1316 throw new NullPointerException(); 1317 this.acc = (System.getSecurityManager() == null) 1318 ? null 1319 : AccessController.getContext(); 1320 this.corePoolSize = corePoolSize; 1321 this.maximumPoolSize = maximumPoolSize; 1322 this.workQueue = workQueue; 1323 this.keepAliveTime = unit.toNanos(keepAliveTime); 1324 this.threadFactory = threadFactory; 1325 this.handler = handler; 1326 } 1327 1328 /** 1329 * Executes the given task sometime in the future. The task 1330 * may execute in a new thread or in an existing pooled thread. 1331 * 1332 * If the task cannot be submitted for execution, either because this 1333 * executor has been shutdown or because its capacity has been reached, 1334 * the task is handled by the current {@code RejectedExecutionHandler}. 1335 * 1336 * @param command the task to execute 1337 * @throws RejectedExecutionException at discretion of 1338 * {@code RejectedExecutionHandler}, if the task 1339 * cannot be accepted for execution 1340 * @throws NullPointerException if {@code command} is null 1341 */ 1342 public void execute(Runnable command) { 1343 if (command == null) 1344 throw new NullPointerException(); 1345 /* 1346 * Proceed in 3 steps: 1347 * 1348 * 1. If fewer than corePoolSize threads are running, try to 1349 * start a new thread with the given command as its first 1350 * task. The call to addWorker atomically checks runState and 1351 * workerCount, and so prevents false alarms that would add 1352 * threads when it shouldn't, by returning false. 1353 * 1354 * 2. If a task can be successfully queued, then we still need 1355 * to double-check whether we should have added a thread 1356 * (because existing ones died since last checking) or that 1357 * the pool shut down since entry into this method. So we 1358 * recheck state and if necessary roll back the enqueuing if 1359 * stopped, or start a new thread if there are none. 1360 * 1361 * 3. If we cannot queue task, then we try to add a new 1362 * thread. If it fails, we know we are shut down or saturated 1363 * and so reject the task. 1364 */ 1365 int c = ctl.get(); 1366 if (workerCountOf(c) < corePoolSize) { 1367 if (addWorker(command, true)) 1368 return; 1369 c = ctl.get(); 1370 } 1371 if (isRunning(c) && workQueue.offer(command)) { 1372 int recheck = ctl.get(); 1373 if (! isRunning(recheck) && remove(command)) 1374 reject(command); 1375 else if (workerCountOf(recheck) == 0) 1376 addWorker(null, false); 1377 } 1378 else if (!addWorker(command, false)) 1379 reject(command); 1380 } 1381 1382 /** 1383 * Initiates an orderly shutdown in which previously submitted 1384 * tasks are executed, but no new tasks will be accepted. 1385 * Invocation has no additional effect if already shut down. 1386 * 1387 * <p>This method does not wait for previously submitted tasks to 1388 * complete execution. Use {@link #awaitTermination awaitTermination} 1389 * to do that. 1390 * 1391 * @throws SecurityException {@inheritDoc} 1392 */ 1393 public void shutdown() { 1394 final ReentrantLock mainLock = this.mainLock; 1395 mainLock.lock(); 1396 try { 1397 checkShutdownAccess(); 1398 advanceRunState(SHUTDOWN); 1399 interruptIdleWorkers(); 1400 onShutdown(); // hook for ScheduledThreadPoolExecutor 1401 } finally { 1402 mainLock.unlock(); 1403 } 1404 tryTerminate(); 1405 } 1406 1407 /** 1408 * Attempts to stop all actively executing tasks, halts the 1409 * processing of waiting tasks, and returns a list of the tasks 1410 * that were awaiting execution. These tasks are drained (removed) 1411 * from the task queue upon return from this method. 1412 * 1413 * <p>This method does not wait for actively executing tasks to 1414 * terminate. Use {@link #awaitTermination awaitTermination} to 1415 * do that. 1416 * 1417 * <p>There are no guarantees beyond best-effort attempts to stop 1418 * processing actively executing tasks. This implementation 1419 * interrupts tasks via {@link Thread#interrupt}; any task that 1420 * fails to respond to interrupts may never terminate. 1421 * 1422 * @throws SecurityException {@inheritDoc} 1423 */ 1424 public List<Runnable> shutdownNow() { 1425 List<Runnable> tasks; 1426 final ReentrantLock mainLock = this.mainLock; 1427 mainLock.lock(); 1428 try { 1429 checkShutdownAccess(); 1430 advanceRunState(STOP); 1431 interruptWorkers(); 1432 tasks = drainQueue(); 1433 } finally { 1434 mainLock.unlock(); 1435 } 1436 tryTerminate(); 1437 return tasks; 1438 } 1439 1440 public boolean isShutdown() { 1441 return ! isRunning(ctl.get()); 1442 } 1443 1444 /** Used by ScheduledThreadPoolExecutor. */ 1445 boolean isStopped() { 1446 return runStateAtLeast(ctl.get(), STOP); 1447 } 1448 1449 /** 1450 * Returns true if this executor is in the process of terminating 1451 * after {@link #shutdown} or {@link #shutdownNow} but has not 1452 * completely terminated. This method may be useful for 1453 * debugging. A return of {@code true} reported a sufficient 1454 * period after shutdown may indicate that submitted tasks have 1455 * ignored or suppressed interruption, causing this executor not 1456 * to properly terminate. 1457 * 1458 * @return {@code true} if terminating but not yet terminated 1459 */ 1460 public boolean isTerminating() { 1461 int c = ctl.get(); 1462 return ! isRunning(c) && runStateLessThan(c, TERMINATED); 1463 } 1464 1465 public boolean isTerminated() { 1466 return runStateAtLeast(ctl.get(), TERMINATED); 1467 } 1468 1469 public boolean awaitTermination(long timeout, TimeUnit unit) 1470 throws InterruptedException { 1471 long nanos = unit.toNanos(timeout); 1472 final ReentrantLock mainLock = this.mainLock; 1473 mainLock.lock(); 1474 try { 1475 while (!runStateAtLeast(ctl.get(), TERMINATED)) { 1476 if (nanos <= 0L) 1477 return false; 1478 nanos = termination.awaitNanos(nanos); 1479 } 1480 return true; 1481 } finally { 1482 mainLock.unlock(); 1483 } 1484 } 1485 1486 /** 1487 * Invokes {@code shutdown} when this executor is no longer 1488 * referenced and it has no threads. 1489 * 1490 * <p>This method is invoked with privileges that are restricted by 1491 * the security context of the caller that invokes the constructor. 1492 * 1493 * @deprecated The {@code finalize} method has been deprecated. 1494 * Subclasses that override {@code finalize} in order to perform cleanup 1495 * should be modified to use alternative cleanup mechanisms and 1496 * to remove the overriding {@code finalize} method. 1497 * When overriding the {@code finalize} method, its implementation must explicitly 1498 * ensure that {@code super.finalize()} is invoked as described in {@link Object#finalize}. 1499 * See the specification for {@link Object#finalize()} for further 1500 * information about migration options. 1501 */ 1502 @Deprecated(since="9") 1503 protected void finalize() { 1504 SecurityManager sm = System.getSecurityManager(); 1505 if (sm == null || acc == null) { 1506 shutdown(); 1507 } else { 1508 PrivilegedAction<Void> pa = () -> { shutdown(); return null; }; 1509 AccessController.doPrivileged(pa, acc); 1510 } 1511 } 1512 1513 /** 1514 * Sets the thread factory used to create new threads. 1515 * 1516 * @param threadFactory the new thread factory 1517 * @throws NullPointerException if threadFactory is null 1518 * @see #getThreadFactory 1519 */ 1520 public void setThreadFactory(ThreadFactory threadFactory) { 1521 if (threadFactory == null) 1522 throw new NullPointerException(); 1523 this.threadFactory = threadFactory; 1524 } 1525 1526 /** 1527 * Returns the thread factory used to create new threads. 1528 * 1529 * @return the current thread factory 1530 * @see #setThreadFactory(ThreadFactory) 1531 */ 1532 public ThreadFactory getThreadFactory() { 1533 return threadFactory; 1534 } 1535 1536 /** 1537 * Sets a new handler for unexecutable tasks. 1538 * 1539 * @param handler the new handler 1540 * @throws NullPointerException if handler is null 1541 * @see #getRejectedExecutionHandler 1542 */ 1543 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { 1544 if (handler == null) 1545 throw new NullPointerException(); 1546 this.handler = handler; 1547 } 1548 1549 /** 1550 * Returns the current handler for unexecutable tasks. 1551 * 1552 * @return the current handler 1553 * @see #setRejectedExecutionHandler(RejectedExecutionHandler) 1554 */ 1555 public RejectedExecutionHandler getRejectedExecutionHandler() { 1556 return handler; 1557 } 1558 1559 /** 1560 * Sets the core number of threads. This overrides any value set 1561 * in the constructor. If the new value is smaller than the 1562 * current value, excess existing threads will be terminated when 1563 * they next become idle. If larger, new threads will, if needed, 1564 * be started to execute any queued tasks. 1565 * 1566 * @param corePoolSize the new core size 1567 * @throws IllegalArgumentException if {@code corePoolSize < 0} 1568 * or {@code corePoolSize} is greater than the {@linkplain 1569 * #getMaximumPoolSize() maximum pool size} 1570 * @see #getCorePoolSize 1571 */ 1572 public void setCorePoolSize(int corePoolSize) { 1573 if (corePoolSize < 0 || maximumPoolSize < corePoolSize) 1574 throw new IllegalArgumentException(); 1575 int delta = corePoolSize - this.corePoolSize; 1576 this.corePoolSize = corePoolSize; 1577 if (workerCountOf(ctl.get()) > corePoolSize) 1578 interruptIdleWorkers(); 1579 else if (delta > 0) { 1580 // We don't really know how many new threads are "needed". 1581 // As a heuristic, prestart enough new workers (up to new 1582 // core size) to handle the current number of tasks in 1583 // queue, but stop if queue becomes empty while doing so. 1584 int k = Math.min(delta, workQueue.size()); 1585 while (k-- > 0 && addWorker(null, true)) { 1586 if (workQueue.isEmpty()) 1587 break; 1588 } 1589 } 1590 } 1591 1592 /** 1593 * Returns the core number of threads. 1594 * 1595 * @return the core number of threads 1596 * @see #setCorePoolSize 1597 */ 1598 public int getCorePoolSize() { 1599 return corePoolSize; 1600 } 1601 1602 /** 1603 * Starts a core thread, causing it to idly wait for work. This 1604 * overrides the default policy of starting core threads only when 1605 * new tasks are executed. This method will return {@code false} 1606 * if all core threads have already been started. 1607 * 1608 * @return {@code true} if a thread was started 1609 */ 1610 public boolean prestartCoreThread() { 1611 return workerCountOf(ctl.get()) < corePoolSize && 1612 addWorker(null, true); 1613 } 1614 1615 /** 1616 * Same as prestartCoreThread except arranges that at least one 1617 * thread is started even if corePoolSize is 0. 1618 */ 1619 void ensurePrestart() { 1620 int wc = workerCountOf(ctl.get()); 1621 if (wc < corePoolSize) 1622 addWorker(null, true); 1623 else if (wc == 0) 1624 addWorker(null, false); 1625 } 1626 1627 /** 1628 * Starts all core threads, causing them to idly wait for work. This 1629 * overrides the default policy of starting core threads only when 1630 * new tasks are executed. 1631 * 1632 * @return the number of threads started 1633 */ 1634 public int prestartAllCoreThreads() { 1635 int n = 0; 1636 while (addWorker(null, true)) 1637 ++n; 1638 return n; 1639 } 1640 1641 /** 1642 * Returns true if this pool allows core threads to time out and 1643 * terminate if no tasks arrive within the keepAlive time, being 1644 * replaced if needed when new tasks arrive. When true, the same 1645 * keep-alive policy applying to non-core threads applies also to 1646 * core threads. When false (the default), core threads are never 1647 * terminated due to lack of incoming tasks. 1648 * 1649 * @return {@code true} if core threads are allowed to time out, 1650 * else {@code false} 1651 * 1652 * @since 1.6 1653 */ 1654 public boolean allowsCoreThreadTimeOut() { 1655 return allowCoreThreadTimeOut; 1656 } 1657 1658 /** 1659 * Sets the policy governing whether core threads may time out and 1660 * terminate if no tasks arrive within the keep-alive time, being 1661 * replaced if needed when new tasks arrive. When false, core 1662 * threads are never terminated due to lack of incoming 1663 * tasks. When true, the same keep-alive policy applying to 1664 * non-core threads applies also to core threads. To avoid 1665 * continual thread replacement, the keep-alive time must be 1666 * greater than zero when setting {@code true}. This method 1667 * should in general be called before the pool is actively used. 1668 * 1669 * @param value {@code true} if should time out, else {@code false} 1670 * @throws IllegalArgumentException if value is {@code true} 1671 * and the current keep-alive time is not greater than zero 1672 * 1673 * @since 1.6 1674 */ 1675 public void allowCoreThreadTimeOut(boolean value) { 1676 if (value && keepAliveTime <= 0) 1677 throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); 1678 if (value != allowCoreThreadTimeOut) { 1679 allowCoreThreadTimeOut = value; 1680 if (value) 1681 interruptIdleWorkers(); 1682 } 1683 } 1684 1685 /** 1686 * Sets the maximum allowed number of threads. This overrides any 1687 * value set in the constructor. If the new value is smaller than 1688 * the current value, excess existing threads will be 1689 * terminated when they next become idle. 1690 * 1691 * @param maximumPoolSize the new maximum 1692 * @throws IllegalArgumentException if the new maximum is 1693 * less than or equal to zero, or 1694 * less than the {@linkplain #getCorePoolSize core pool size} 1695 * @see #getMaximumPoolSize 1696 */ 1697 public void setMaximumPoolSize(int maximumPoolSize) { 1698 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) 1699 throw new IllegalArgumentException(); 1700 this.maximumPoolSize = maximumPoolSize; 1701 if (workerCountOf(ctl.get()) > maximumPoolSize) 1702 interruptIdleWorkers(); 1703 } 1704 1705 /** 1706 * Returns the maximum allowed number of threads. 1707 * 1708 * @return the maximum allowed number of threads 1709 * @see #setMaximumPoolSize 1710 */ 1711 public int getMaximumPoolSize() { 1712 return maximumPoolSize; 1713 } 1714 1715 /** 1716 * Sets the thread keep-alive time, which is the amount of time 1717 * that threads may remain idle before being terminated. 1718 * Threads that wait this amount of time without processing a 1719 * task will be terminated if there are more than the core 1720 * number of threads currently in the pool, or if this pool 1721 * {@linkplain #allowsCoreThreadTimeOut() allows core thread timeout}. 1722 * This overrides any value set in the constructor. 1723 * 1724 * @param time the time to wait. A time value of zero will cause 1725 * excess threads to terminate immediately after executing tasks. 1726 * @param unit the time unit of the {@code time} argument 1727 * @throws IllegalArgumentException if {@code time} less than zero or 1728 * if {@code time} is zero and {@code allowsCoreThreadTimeOut} 1729 * @see #getKeepAliveTime(TimeUnit) 1730 */ 1731 public void setKeepAliveTime(long time, TimeUnit unit) { 1732 if (time < 0) 1733 throw new IllegalArgumentException(); 1734 if (time == 0 && allowsCoreThreadTimeOut()) 1735 throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); 1736 long keepAliveTime = unit.toNanos(time); 1737 long delta = keepAliveTime - this.keepAliveTime; 1738 this.keepAliveTime = keepAliveTime; 1739 if (delta < 0) 1740 interruptIdleWorkers(); 1741 } 1742 1743 /** 1744 * Returns the thread keep-alive time, which is the amount of time 1745 * that threads may remain idle before being terminated. 1746 * Threads that wait this amount of time without processing a 1747 * task will be terminated if there are more than the core 1748 * number of threads currently in the pool, or if this pool 1749 * {@linkplain #allowsCoreThreadTimeOut() allows core thread timeout}. 1750 * 1751 * @param unit the desired time unit of the result 1752 * @return the time limit 1753 * @see #setKeepAliveTime(long, TimeUnit) 1754 */ 1755 public long getKeepAliveTime(TimeUnit unit) { 1756 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); 1757 } 1758 1759 /* User-level queue utilities */ 1760 1761 /** 1762 * Returns the task queue used by this executor. Access to the 1763 * task queue is intended primarily for debugging and monitoring. 1764 * This queue may be in active use. Retrieving the task queue 1765 * does not prevent queued tasks from executing. 1766 * 1767 * @return the task queue 1768 */ 1769 public BlockingQueue<Runnable> getQueue() { 1770 return workQueue; 1771 } 1772 1773 /** 1774 * Removes this task from the executor's internal queue if it is 1775 * present, thus causing it not to be run if it has not already 1776 * started. 1777 * 1778 * <p>This method may be useful as one part of a cancellation 1779 * scheme. It may fail to remove tasks that have been converted 1780 * into other forms before being placed on the internal queue. 1781 * For example, a task entered using {@code submit} might be 1782 * converted into a form that maintains {@code Future} status. 1783 * However, in such cases, method {@link #purge} may be used to 1784 * remove those Futures that have been cancelled. 1785 * 1786 * @param task the task to remove 1787 * @return {@code true} if the task was removed 1788 */ 1789 public boolean remove(Runnable task) { 1790 boolean removed = workQueue.remove(task); 1791 tryTerminate(); // In case SHUTDOWN and now empty 1792 return removed; 1793 } 1794 1795 /** 1796 * Tries to remove from the work queue all {@link Future} 1797 * tasks that have been cancelled. This method can be useful as a 1798 * storage reclamation operation, that has no other impact on 1799 * functionality. Cancelled tasks are never executed, but may 1800 * accumulate in work queues until worker threads can actively 1801 * remove them. Invoking this method instead tries to remove them now. 1802 * However, this method may fail to remove tasks in 1803 * the presence of interference by other threads. 1804 */ 1805 public void purge() { 1806 final BlockingQueue<Runnable> q = workQueue; 1807 try { 1808 Iterator<Runnable> it = q.iterator(); 1809 while (it.hasNext()) { 1810 Runnable r = it.next(); 1811 if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) 1812 it.remove(); 1813 } 1814 } catch (ConcurrentModificationException fallThrough) { 1815 // Take slow path if we encounter interference during traversal. 1816 // Make copy for traversal and call remove for cancelled entries. 1817 // The slow path is more likely to be O(N*N). 1818 for (Object r : q.toArray()) 1819 if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) 1820 q.remove(r); 1821 } 1822 1823 tryTerminate(); // In case SHUTDOWN and now empty 1824 } 1825 1826 /* Statistics */ 1827 1828 /** 1829 * Returns the current number of threads in the pool. 1830 * 1831 * @return the number of threads 1832 */ 1833 public int getPoolSize() { 1834 final ReentrantLock mainLock = this.mainLock; 1835 mainLock.lock(); 1836 try { 1837 // Remove rare and surprising possibility of 1838 // isTerminated() && getPoolSize() > 0 1839 return runStateAtLeast(ctl.get(), TIDYING) ? 0 1840 : workers.size(); 1841 } finally { 1842 mainLock.unlock(); 1843 } 1844 } 1845 1846 /** 1847 * Returns the approximate number of threads that are actively 1848 * executing tasks. 1849 * 1850 * @return the number of threads 1851 */ 1852 public int getActiveCount() { 1853 final ReentrantLock mainLock = this.mainLock; 1854 mainLock.lock(); 1855 try { 1856 int n = 0; 1857 for (Worker w : workers) 1858 if (w.isLocked()) 1859 ++n; 1860 return n; 1861 } finally { 1862 mainLock.unlock(); 1863 } 1864 } 1865 1866 /** 1867 * Returns the largest number of threads that have ever 1868 * simultaneously been in the pool. 1869 * 1870 * @return the number of threads 1871 */ 1872 public int getLargestPoolSize() { 1873 final ReentrantLock mainLock = this.mainLock; 1874 mainLock.lock(); 1875 try { 1876 return largestPoolSize; 1877 } finally { 1878 mainLock.unlock(); 1879 } 1880 } 1881 1882 /** 1883 * Returns the approximate total number of tasks that have ever been 1884 * scheduled for execution. Because the states of tasks and 1885 * threads may change dynamically during computation, the returned 1886 * value is only an approximation. 1887 * 1888 * @return the number of tasks 1889 */ 1890 public long getTaskCount() { 1891 final ReentrantLock mainLock = this.mainLock; 1892 mainLock.lock(); 1893 try { 1894 long n = completedTaskCount; 1895 for (Worker w : workers) { 1896 n += w.completedTasks; 1897 if (w.isLocked()) 1898 ++n; 1899 } 1900 return n + workQueue.size(); 1901 } finally { 1902 mainLock.unlock(); 1903 } 1904 } 1905 1906 /** 1907 * Returns the approximate total number of tasks that have 1908 * completed execution. Because the states of tasks and threads 1909 * may change dynamically during computation, the returned value 1910 * is only an approximation, but one that does not ever decrease 1911 * across successive calls. 1912 * 1913 * @return the number of tasks 1914 */ 1915 public long getCompletedTaskCount() { 1916 final ReentrantLock mainLock = this.mainLock; 1917 mainLock.lock(); 1918 try { 1919 long n = completedTaskCount; 1920 for (Worker w : workers) 1921 n += w.completedTasks; 1922 return n; 1923 } finally { 1924 mainLock.unlock(); 1925 } 1926 } 1927 1928 /** 1929 * Returns a string identifying this pool, as well as its state, 1930 * including indications of run state and estimated worker and 1931 * task counts. 1932 * 1933 * @return a string identifying this pool, as well as its state 1934 */ 1935 public String toString() { 1936 long ncompleted; 1937 int nworkers, nactive; 1938 final ReentrantLock mainLock = this.mainLock; 1939 mainLock.lock(); 1940 try { 1941 ncompleted = completedTaskCount; 1942 nactive = 0; 1943 nworkers = workers.size(); 1944 for (Worker w : workers) { 1945 ncompleted += w.completedTasks; 1946 if (w.isLocked()) 1947 ++nactive; 1948 } 1949 } finally { 1950 mainLock.unlock(); 1951 } 1952 int c = ctl.get(); 1953 String runState = 1954 runStateLessThan(c, SHUTDOWN) ? "Running" : 1955 runStateAtLeast(c, TERMINATED) ? "Terminated" : 1956 "Shutting down"; 1957 return super.toString() + 1958 "[" + runState + 1959 ", pool size = " + nworkers + 1960 ", active threads = " + nactive + 1961 ", queued tasks = " + workQueue.size() + 1962 ", completed tasks = " + ncompleted + 1963 "]"; 1964 } 1965 1966 /* Extension hooks */ 1967 1968 /** 1969 * Method invoked prior to executing the given Runnable in the 1970 * given thread. This method is invoked by thread {@code t} that 1971 * will execute task {@code r}, and may be used to re-initialize 1972 * ThreadLocals, or to perform logging. 1973 * 1974 * <p>This implementation does nothing, but may be customized in 1975 * subclasses. Note: To properly nest multiple overridings, subclasses 1976 * should generally invoke {@code super.beforeExecute} at the end of 1977 * this method. 1978 * 1979 * @param t the thread that will run task {@code r} 1980 * @param r the task that will be executed 1981 */ 1982 protected void beforeExecute(Thread t, Runnable r) { } 1983 1984 /** 1985 * Method invoked upon completion of execution of the given Runnable. 1986 * This method is invoked by the thread that executed the task. If 1987 * non-null, the Throwable is the uncaught {@code RuntimeException} 1988 * or {@code Error} that caused execution to terminate abruptly. 1989 * 1990 * <p>This implementation does nothing, but may be customized in 1991 * subclasses. Note: To properly nest multiple overridings, subclasses 1992 * should generally invoke {@code super.afterExecute} at the 1993 * beginning of this method. 1994 * 1995 * <p><b>Note:</b> When actions are enclosed in tasks (such as 1996 * {@link FutureTask}) either explicitly or via methods such as 1997 * {@code submit}, these task objects catch and maintain 1998 * computational exceptions, and so they do not cause abrupt 1999 * termination, and the internal exceptions are <em>not</em> 2000 * passed to this method. If you would like to trap both kinds of 2001 * failures in this method, you can further probe for such cases, 2002 * as in this sample subclass that prints either the direct cause 2003 * or the underlying exception if a task has been aborted: 2004 * 2005 * <pre> {@code 2006 * class ExtendedExecutor extends ThreadPoolExecutor { 2007 * // ... 2008 * protected void afterExecute(Runnable r, Throwable t) { 2009 * super.afterExecute(r, t); 2010 * if (t == null 2011 * && r instanceof Future<?> 2012 * && ((Future<?>)r).isDone()) { 2013 * try { 2014 * Object result = ((Future<?>) r).get(); 2015 * } catch (CancellationException ce) { 2016 * t = ce; 2017 * } catch (ExecutionException ee) { 2018 * t = ee.getCause(); 2019 * } catch (InterruptedException ie) { 2020 * // ignore/reset 2021 * Thread.currentThread().interrupt(); 2022 * } 2023 * } 2024 * if (t != null) 2025 * System.out.println(t); 2026 * } 2027 * }}</pre> 2028 * 2029 * @param r the runnable that has completed 2030 * @param t the exception that caused termination, or null if 2031 * execution completed normally 2032 */ 2033 protected void afterExecute(Runnable r, Throwable t) { } 2034 2035 /** 2036 * Method invoked when the Executor has terminated. Default 2037 * implementation does nothing. Note: To properly nest multiple 2038 * overridings, subclasses should generally invoke 2039 * {@code super.terminated} within this method. 2040 */ 2041 protected void terminated() { } 2042 2043 /* Predefined RejectedExecutionHandlers */ 2044 2045 /** 2046 * A handler for rejected tasks that runs the rejected task 2047 * directly in the calling thread of the {@code execute} method, 2048 * unless the executor has been shut down, in which case the task 2049 * is discarded. 2050 */ 2051 public static class CallerRunsPolicy implements RejectedExecutionHandler { 2052 /** 2053 * Creates a {@code CallerRunsPolicy}. 2054 */ 2055 public CallerRunsPolicy() { } 2056 2057 /** 2058 * Executes task r in the caller's thread, unless the executor 2059 * has been shut down, in which case the task is discarded. 2060 * 2061 * @param r the runnable task requested to be executed 2062 * @param e the executor attempting to execute this task 2063 */ 2064 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 2065 if (!e.isShutdown()) { 2066 r.run(); 2067 } 2068 } 2069 } 2070 2071 /** 2072 * A handler for rejected tasks that throws a 2073 * {@link RejectedExecutionException}. 2074 * 2075 * This is the default handler for {@link ThreadPoolExecutor} and 2076 * {@link ScheduledThreadPoolExecutor}. 2077 */ 2078 public static class AbortPolicy implements RejectedExecutionHandler { 2079 /** 2080 * Creates an {@code AbortPolicy}. 2081 */ 2082 public AbortPolicy() { } 2083 2084 /** 2085 * Always throws RejectedExecutionException. 2086 * 2087 * @param r the runnable task requested to be executed 2088 * @param e the executor attempting to execute this task 2089 * @throws RejectedExecutionException always 2090 */ 2091 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 2092 throw new RejectedExecutionException("Task " + r.toString() + 2093 " rejected from " + 2094 e.toString()); 2095 } 2096 } 2097 2098 /** 2099 * A handler for rejected tasks that silently discards the 2100 * rejected task. 2101 */ 2102 public static class DiscardPolicy implements RejectedExecutionHandler { 2103 /** 2104 * Creates a {@code DiscardPolicy}. 2105 */ 2106 public DiscardPolicy() { } 2107 2108 /** 2109 * Does nothing, which has the effect of discarding task r. 2110 * 2111 * @param r the runnable task requested to be executed 2112 * @param e the executor attempting to execute this task 2113 */ 2114 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 2115 } 2116 } 2117 2118 /** 2119 * A handler for rejected tasks that discards the oldest unhandled 2120 * request and then retries {@code execute}, unless the executor 2121 * is shut down, in which case the task is discarded. 2122 */ 2123 public static class DiscardOldestPolicy implements RejectedExecutionHandler { 2124 /** 2125 * Creates a {@code DiscardOldestPolicy} for the given executor. 2126 */ 2127 public DiscardOldestPolicy() { } 2128 2129 /** 2130 * Obtains and ignores the next task that the executor 2131 * would otherwise execute, if one is immediately available, 2132 * and then retries execution of task r, unless the executor 2133 * is shut down, in which case task r is instead discarded. 2134 * 2135 * @param r the runnable task requested to be executed 2136 * @param e the executor attempting to execute this task 2137 */ 2138 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 2139 if (!e.isShutdown()) { 2140 e.getQueue().poll(); 2141 e.execute(r); 2142 } 2143 } 2144 } 2145} 2146