1/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation.  Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25/*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36package java.util.concurrent;
37
38import java.lang.invoke.MethodHandles;
39import java.lang.invoke.VarHandle;
40import java.util.concurrent.locks.LockSupport;
41
42/**
43 * A cancellable asynchronous computation.  This class provides a base
44 * implementation of {@link Future}, with methods to start and cancel
45 * a computation, query to see if the computation is complete, and
46 * retrieve the result of the computation.  The result can only be
47 * retrieved when the computation has completed; the {@code get}
48 * methods will block if the computation has not yet completed.  Once
49 * the computation has completed, the computation cannot be restarted
50 * or cancelled (unless the computation is invoked using
51 * {@link #runAndReset}).
52 *
53 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
54 * {@link Runnable} object.  Because {@code FutureTask} implements
55 * {@code Runnable}, a {@code FutureTask} can be submitted to an
56 * {@link Executor} for execution.
57 *
58 * <p>In addition to serving as a standalone class, this class provides
59 * {@code protected} functionality that may be useful when creating
60 * customized task classes.
61 *
62 * @since 1.5
63 * @author Doug Lea
64 * @param <V> The result type returned by this FutureTask's {@code get} methods
65 */
66public class FutureTask<V> implements RunnableFuture<V> {
67    /*
68     * Revision notes: This differs from previous versions of this
69     * class that relied on AbstractQueuedSynchronizer, mainly to
70     * avoid surprising users about retaining interrupt status during
71     * cancellation races. Sync control in the current design relies
72     * on a "state" field updated via CAS to track completion, along
73     * with a simple Treiber stack to hold waiting threads.
74     */
75
76    /**
77     * The run state of this task, initially NEW.  The run state
78     * transitions to a terminal state only in methods set,
79     * setException, and cancel.  During completion, state may take on
80     * transient values of COMPLETING (while outcome is being set) or
81     * INTERRUPTING (only while interrupting the runner to satisfy a
82     * cancel(true)). Transitions from these intermediate to final
83     * states use cheaper ordered/lazy writes because values are unique
84     * and cannot be further modified.
85     *
86     * Possible state transitions:
87     * NEW -> COMPLETING -> NORMAL
88     * NEW -> COMPLETING -> EXCEPTIONAL
89     * NEW -> CANCELLED
90     * NEW -> INTERRUPTING -> INTERRUPTED
91     */
92    private volatile int state;
93    private static final int NEW          = 0;
94    private static final int COMPLETING   = 1;
95    private static final int NORMAL       = 2;
96    private static final int EXCEPTIONAL  = 3;
97    private static final int CANCELLED    = 4;
98    private static final int INTERRUPTING = 5;
99    private static final int INTERRUPTED  = 6;
100
101    /** The underlying callable; nulled out after running */
102    private Callable<V> callable;
103    /** The result to return or exception to throw from get() */
104    private Object outcome; // non-volatile, protected by state reads/writes
105    /** The thread running the callable; CASed during run() */
106    private volatile Thread runner;
107    /** Treiber stack of waiting threads */
108    private volatile WaitNode waiters;
109
110    /**
111     * Returns result or throws exception for completed task.
112     *
113     * @param s completed state value
114     */
115    @SuppressWarnings("unchecked")
116    private V report(int s) throws ExecutionException {
117        Object x = outcome;
118        if (s == NORMAL)
119            return (V)x;
120        if (s >= CANCELLED)
121            throw new CancellationException();
122        throw new ExecutionException((Throwable)x);
123    }
124
125    /**
126     * Creates a {@code FutureTask} that will, upon running, execute the
127     * given {@code Callable}.
128     *
129     * @param  callable the callable task
130     * @throws NullPointerException if the callable is null
131     */
132    public FutureTask(Callable<V> callable) {
133        if (callable == null)
134            throw new NullPointerException();
135        this.callable = callable;
136        this.state = NEW;       // ensure visibility of callable
137    }
138
139    /**
140     * Creates a {@code FutureTask} that will, upon running, execute the
141     * given {@code Runnable}, and arrange that {@code get} will return the
142     * given result on successful completion.
143     *
144     * @param runnable the runnable task
145     * @param result the result to return on successful completion. If
146     * you don't need a particular result, consider using
147     * constructions of the form:
148     * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
149     * @throws NullPointerException if the runnable is null
150     */
151    public FutureTask(Runnable runnable, V result) {
152        this.callable = Executors.callable(runnable, result);
153        this.state = NEW;       // ensure visibility of callable
154    }
155
156    public boolean isCancelled() {
157        return state >= CANCELLED;
158    }
159
160    public boolean isDone() {
161        return state != NEW;
162    }
163
164    public boolean cancel(boolean mayInterruptIfRunning) {
165        if (!(state == NEW && STATE.compareAndSet
166              (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
167            return false;
168        try {    // in case call to interrupt throws exception
169            if (mayInterruptIfRunning) {
170                try {
171                    Thread t = runner;
172                    if (t != null)
173                        t.interrupt();
174                } finally { // final state
175                    STATE.setRelease(this, INTERRUPTED);
176                }
177            }
178        } finally {
179            finishCompletion();
180        }
181        return true;
182    }
183
184    /**
185     * @throws CancellationException {@inheritDoc}
186     */
187    public V get() throws InterruptedException, ExecutionException {
188        int s = state;
189        if (s <= COMPLETING)
190            s = awaitDone(false, 0L);
191        return report(s);
192    }
193
194    /**
195     * @throws CancellationException {@inheritDoc}
196     */
197    public V get(long timeout, TimeUnit unit)
198        throws InterruptedException, ExecutionException, TimeoutException {
199        if (unit == null)
200            throw new NullPointerException();
201        int s = state;
202        if (s <= COMPLETING &&
203            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
204            throw new TimeoutException();
205        return report(s);
206    }
207
208    /**
209     * Protected method invoked when this task transitions to state
210     * {@code isDone} (whether normally or via cancellation). The
211     * default implementation does nothing.  Subclasses may override
212     * this method to invoke completion callbacks or perform
213     * bookkeeping. Note that you can query status inside the
214     * implementation of this method to determine whether this task
215     * has been cancelled.
216     */
217    protected void done() { }
218
219    /**
220     * Sets the result of this future to the given value unless
221     * this future has already been set or has been cancelled.
222     *
223     * <p>This method is invoked internally by the {@link #run} method
224     * upon successful completion of the computation.
225     *
226     * @param v the value
227     */
228    protected void set(V v) {
229        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
230            outcome = v;
231            STATE.setRelease(this, NORMAL); // final state
232            finishCompletion();
233        }
234    }
235
236    /**
237     * Causes this future to report an {@link ExecutionException}
238     * with the given throwable as its cause, unless this future has
239     * already been set or has been cancelled.
240     *
241     * <p>This method is invoked internally by the {@link #run} method
242     * upon failure of the computation.
243     *
244     * @param t the cause of failure
245     */
246    protected void setException(Throwable t) {
247        if (STATE.compareAndSet(this, NEW, COMPLETING)) {
248            outcome = t;
249            STATE.setRelease(this, EXCEPTIONAL); // final state
250            finishCompletion();
251        }
252    }
253
254    public void run() {
255        if (state != NEW ||
256            !RUNNER.compareAndSet(this, null, Thread.currentThread()))
257            return;
258        try {
259            Callable<V> c = callable;
260            if (c != null && state == NEW) {
261                V result;
262                boolean ran;
263                try {
264                    result = c.call();
265                    ran = true;
266                } catch (Throwable ex) {
267                    result = null;
268                    ran = false;
269                    setException(ex);
270                }
271                if (ran)
272                    set(result);
273            }
274        } finally {
275            // runner must be non-null until state is settled to
276            // prevent concurrent calls to run()
277            runner = null;
278            // state must be re-read after nulling runner to prevent
279            // leaked interrupts
280            int s = state;
281            if (s >= INTERRUPTING)
282                handlePossibleCancellationInterrupt(s);
283        }
284    }
285
286    /**
287     * Executes the computation without setting its result, and then
288     * resets this future to initial state, failing to do so if the
289     * computation encounters an exception or is cancelled.  This is
290     * designed for use with tasks that intrinsically execute more
291     * than once.
292     *
293     * @return {@code true} if successfully run and reset
294     */
295    protected boolean runAndReset() {
296        if (state != NEW ||
297            !RUNNER.compareAndSet(this, null, Thread.currentThread()))
298            return false;
299        boolean ran = false;
300        int s = state;
301        try {
302            Callable<V> c = callable;
303            if (c != null && s == NEW) {
304                try {
305                    c.call(); // don't set result
306                    ran = true;
307                } catch (Throwable ex) {
308                    setException(ex);
309                }
310            }
311        } finally {
312            // runner must be non-null until state is settled to
313            // prevent concurrent calls to run()
314            runner = null;
315            // state must be re-read after nulling runner to prevent
316            // leaked interrupts
317            s = state;
318            if (s >= INTERRUPTING)
319                handlePossibleCancellationInterrupt(s);
320        }
321        return ran && s == NEW;
322    }
323
324    /**
325     * Ensures that any interrupt from a possible cancel(true) is only
326     * delivered to a task while in run or runAndReset.
327     */
328    private void handlePossibleCancellationInterrupt(int s) {
329        // It is possible for our interrupter to stall before getting a
330        // chance to interrupt us.  Let's spin-wait patiently.
331        if (s == INTERRUPTING)
332            while (state == INTERRUPTING)
333                Thread.yield(); // wait out pending interrupt
334
335        // assert state == INTERRUPTED;
336
337        // We want to clear any interrupt we may have received from
338        // cancel(true).  However, it is permissible to use interrupts
339        // as an independent mechanism for a task to communicate with
340        // its caller, and there is no way to clear only the
341        // cancellation interrupt.
342        //
343        // Thread.interrupted();
344    }
345
346    /**
347     * Simple linked list nodes to record waiting threads in a Treiber
348     * stack.  See other classes such as Phaser and SynchronousQueue
349     * for more detailed explanation.
350     */
351    static final class WaitNode {
352        volatile Thread thread;
353        volatile WaitNode next;
354        WaitNode() { thread = Thread.currentThread(); }
355    }
356
357    /**
358     * Removes and signals all waiting threads, invokes done(), and
359     * nulls out callable.
360     */
361    private void finishCompletion() {
362        // assert state > COMPLETING;
363        for (WaitNode q; (q = waiters) != null;) {
364            if (WAITERS.weakCompareAndSet(this, q, null)) {
365                for (;;) {
366                    Thread t = q.thread;
367                    if (t != null) {
368                        q.thread = null;
369                        LockSupport.unpark(t);
370                    }
371                    WaitNode next = q.next;
372                    if (next == null)
373                        break;
374                    q.next = null; // unlink to help gc
375                    q = next;
376                }
377                break;
378            }
379        }
380
381        done();
382
383        callable = null;        // to reduce footprint
384    }
385
386    /**
387     * Awaits completion or aborts on interrupt or timeout.
388     *
389     * @param timed true if use timed waits
390     * @param nanos time to wait, if timed
391     * @return state upon completion or at timeout
392     */
393    private int awaitDone(boolean timed, long nanos)
394        throws InterruptedException {
395        // The code below is very delicate, to achieve these goals:
396        // - call nanoTime exactly once for each call to park
397        // - if nanos <= 0L, return promptly without allocation or nanoTime
398        // - if nanos == Long.MIN_VALUE, don't underflow
399        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
400        //   and we suffer a spurious wakeup, we will do no worse than
401        //   to park-spin for a while
402        long startTime = 0L;    // Special value 0L means not yet parked
403        WaitNode q = null;
404        boolean queued = false;
405        for (;;) {
406            int s = state;
407            if (s > COMPLETING) {
408                if (q != null)
409                    q.thread = null;
410                return s;
411            }
412            else if (s == COMPLETING)
413                // We may have already promised (via isDone) that we are done
414                // so never return empty-handed or throw InterruptedException
415                Thread.yield();
416            else if (Thread.interrupted()) {
417                removeWaiter(q);
418                throw new InterruptedException();
419            }
420            else if (q == null) {
421                if (timed && nanos <= 0L)
422                    return s;
423                q = new WaitNode();
424            }
425            else if (!queued)
426                queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
427            else if (timed) {
428                final long parkNanos;
429                if (startTime == 0L) { // first time
430                    startTime = System.nanoTime();
431                    if (startTime == 0L)
432                        startTime = 1L;
433                    parkNanos = nanos;
434                } else {
435                    long elapsed = System.nanoTime() - startTime;
436                    if (elapsed >= nanos) {
437                        removeWaiter(q);
438                        return state;
439                    }
440                    parkNanos = nanos - elapsed;
441                }
442                // nanoTime may be slow; recheck before parking
443                if (state < COMPLETING)
444                    LockSupport.parkNanos(this, parkNanos);
445            }
446            else
447                LockSupport.park(this);
448        }
449    }
450
451    /**
452     * Tries to unlink a timed-out or interrupted wait node to avoid
453     * accumulating garbage.  Internal nodes are simply unspliced
454     * without CAS since it is harmless if they are traversed anyway
455     * by releasers.  To avoid effects of unsplicing from already
456     * removed nodes, the list is retraversed in case of an apparent
457     * race.  This is slow when there are a lot of nodes, but we don't
458     * expect lists to be long enough to outweigh higher-overhead
459     * schemes.
460     */
461    private void removeWaiter(WaitNode node) {
462        if (node != null) {
463            node.thread = null;
464            retry:
465            for (;;) {          // restart on removeWaiter race
466                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
467                    s = q.next;
468                    if (q.thread != null)
469                        pred = q;
470                    else if (pred != null) {
471                        pred.next = s;
472                        if (pred.thread == null) // check for race
473                            continue retry;
474                    }
475                    else if (!WAITERS.compareAndSet(this, q, s))
476                        continue retry;
477                }
478                break;
479            }
480        }
481    }
482
483    // VarHandle mechanics
484    private static final VarHandle STATE;
485    private static final VarHandle RUNNER;
486    private static final VarHandle WAITERS;
487    static {
488        try {
489            MethodHandles.Lookup l = MethodHandles.lookup();
490            STATE = l.findVarHandle(FutureTask.class, "state", int.class);
491            RUNNER = l.findVarHandle(FutureTask.class, "runner", Thread.class);
492            WAITERS = l.findVarHandle(FutureTask.class, "waiters", WaitNode.class);
493        } catch (ReflectiveOperationException e) {
494            throw new Error(e);
495        }
496
497        // Reduce the risk of rare disastrous classloading in first call to
498        // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
499        Class<?> ensureLoaded = LockSupport.class;
500    }
501
502}
503