1/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This code is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11 * version 2 for more details (a copy is included in the LICENSE file that
12 * accompanied this code).
13 *
14 * You should have received a copy of the GNU General Public License version
15 * 2 along with this work; if not, write to the Free Software Foundation,
16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
17 *
18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
19 * or visit www.oracle.com if you need additional information or have any
20 * questions.
21 */
22
23/*
24 * This file is available under and governed by the GNU General Public
25 * License version 2 only, as published by the Free Software Foundation.
26 * However, the following notice accompanied the original version of this
27 * file:
28 *
29 * Written by Doug Lea with assistance from members of JCP JSR-166
30 * Expert Group and released to the public domain, as explained at
31 * http://creativecommons.org/publicdomain/zero/1.0/
32 * Other contributors include Andrew Wright, Jeffrey Hayes,
33 * Pat Fisher, Mike Judd.
34 */
35
36import static java.util.concurrent.TimeUnit.MILLISECONDS;
37
38import java.util.concurrent.ArrayBlockingQueue;
39import java.util.concurrent.Callable;
40import java.util.concurrent.CompletionService;
41import java.util.concurrent.CountDownLatch;
42import java.util.concurrent.ExecutionException;
43import java.util.concurrent.ExecutorCompletionService;
44import java.util.concurrent.ExecutorService;
45import java.util.concurrent.Future;
46import java.util.concurrent.FutureTask;
47import java.util.concurrent.RunnableFuture;
48import java.util.concurrent.ThreadPoolExecutor;
49import java.util.concurrent.TimeUnit;
50import java.util.concurrent.atomic.AtomicBoolean;
51
52import junit.framework.Test;
53import junit.framework.TestSuite;
54
55public class ExecutorCompletionServiceTest extends JSR166TestCase {
56    public static void main(String[] args) {
57        main(suite(), args);
58    }
59    public static Test suite() {
60        return new TestSuite(ExecutorCompletionServiceTest.class);
61    }
62
63    /**
64     * new ExecutorCompletionService(null) throws NullPointerException
65     */
66    public void testConstructorNPE() {
67        try {
68            new ExecutorCompletionService(null);
69            shouldThrow();
70        } catch (NullPointerException success) {}
71    }
72
73    /**
74     * new ExecutorCompletionService(e, null) throws NullPointerException
75     */
76    public void testConstructorNPE2() {
77        try {
78            new ExecutorCompletionService(cachedThreadPool, null);
79            shouldThrow();
80        } catch (NullPointerException success) {}
81    }
82
83    /**
84     * ecs.submit(null) throws NullPointerException
85     */
86    public void testSubmitNullCallable() {
87        CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
88        try {
89            cs.submit((Callable) null);
90            shouldThrow();
91        } catch (NullPointerException success) {}
92    }
93
94    /**
95     * ecs.submit(null, val) throws NullPointerException
96     */
97    public void testSubmitNullRunnable() {
98        CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
99        try {
100            cs.submit((Runnable) null, Boolean.TRUE);
101            shouldThrow();
102        } catch (NullPointerException success) {}
103    }
104
105    /**
106     * A taken submitted task is completed
107     */
108    public void testTake()
109        throws InterruptedException, ExecutionException {
110        CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
111        cs.submit(new StringTask());
112        Future f = cs.take();
113        assertTrue(f.isDone());
114        assertSame(TEST_STRING, f.get());
115    }
116
117    /**
118     * Take returns the same future object returned by submit
119     */
120    public void testTake2() throws InterruptedException {
121        CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
122        Future f1 = cs.submit(new StringTask());
123        Future f2 = cs.take();
124        assertSame(f1, f2);
125    }
126
127    /**
128     * poll returns non-null when the returned task is completed
129     */
130    public void testPoll1()
131        throws InterruptedException, ExecutionException {
132        CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
133        assertNull(cs.poll());
134        cs.submit(new StringTask());
135
136        long startTime = System.nanoTime();
137        Future f;
138        while ((f = cs.poll()) == null) {
139            if (millisElapsedSince(startTime) > LONG_DELAY_MS)
140                fail("timed out");
141            Thread.yield();
142        }
143        assertTrue(f.isDone());
144        assertSame(TEST_STRING, f.get());
145    }
146
147    /**
148     * timed poll returns non-null when the returned task is completed
149     */
150    public void testPoll2()
151        throws InterruptedException, ExecutionException {
152        CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
153        assertNull(cs.poll());
154        cs.submit(new StringTask());
155
156        long startTime = System.nanoTime();
157        Future f;
158        while ((f = cs.poll(SHORT_DELAY_MS, MILLISECONDS)) == null) {
159            if (millisElapsedSince(startTime) > LONG_DELAY_MS)
160                fail("timed out");
161            Thread.yield();
162        }
163        assertTrue(f.isDone());
164        assertSame(TEST_STRING, f.get());
165    }
166
167    /**
168     * poll returns null before the returned task is completed
169     */
170    public void testPollReturnsNull()
171        throws InterruptedException, ExecutionException {
172        CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
173        final CountDownLatch proceed = new CountDownLatch(1);
174        cs.submit(new Callable() { public String call() throws Exception {
175            proceed.await();
176            return TEST_STRING;
177        }});
178        assertNull(cs.poll());
179        assertNull(cs.poll(0L, MILLISECONDS));
180        assertNull(cs.poll(Long.MIN_VALUE, MILLISECONDS));
181        long startTime = System.nanoTime();
182        assertNull(cs.poll(timeoutMillis(), MILLISECONDS));
183        assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
184        proceed.countDown();
185        assertSame(TEST_STRING, cs.take().get());
186    }
187
188    /**
189     * successful and failed tasks are both returned
190     */
191    public void testTaskAssortment()
192        throws InterruptedException, ExecutionException {
193        CompletionService cs = new ExecutorCompletionService(cachedThreadPool);
194        ArithmeticException ex = new ArithmeticException();
195        for (int i = 0; i < 2; i++) {
196            cs.submit(new StringTask());
197            cs.submit(callableThrowing(ex));
198            cs.submit(runnableThrowing(ex), null);
199        }
200        int normalCompletions = 0;
201        int exceptionalCompletions = 0;
202        for (int i = 0; i < 3 * 2; i++) {
203            try {
204                if (cs.take().get() == TEST_STRING)
205                    normalCompletions++;
206            }
207            catch (ExecutionException expected) {
208                assertTrue(expected.getCause() instanceof ArithmeticException);
209                exceptionalCompletions++;
210            }
211        }
212        assertEquals(2 * 1, normalCompletions);
213        assertEquals(2 * 2, exceptionalCompletions);
214        assertNull(cs.poll());
215    }
216
217    /**
218     * Submitting to underlying AES that overrides newTaskFor(Callable)
219     * returns and eventually runs Future returned by newTaskFor.
220     */
221    public void testNewTaskForCallable() throws InterruptedException {
222        final AtomicBoolean done = new AtomicBoolean(false);
223        class MyCallableFuture<V> extends FutureTask<V> {
224            MyCallableFuture(Callable<V> c) { super(c); }
225            @Override protected void done() { done.set(true); }
226        }
227        final ExecutorService e =
228            new ThreadPoolExecutor(1, 1,
229                                   30L, TimeUnit.SECONDS,
230                                   new ArrayBlockingQueue<Runnable>(1)) {
231                protected <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
232                    return new MyCallableFuture<T>(c);
233                }};
234        CompletionService<String> cs = new ExecutorCompletionService<>(e);
235        try (PoolCleaner cleaner = cleaner(e)) {
236            assertNull(cs.poll());
237            Callable<String> c = new StringTask();
238            Future f1 = cs.submit(c);
239            assertTrue("submit must return MyCallableFuture",
240                       f1 instanceof MyCallableFuture);
241            Future f2 = cs.take();
242            assertSame("submit and take must return same objects", f1, f2);
243            assertTrue("completed task must have set done", done.get());
244        }
245    }
246
247    /**
248     * Submitting to underlying AES that overrides newTaskFor(Runnable,T)
249     * returns and eventually runs Future returned by newTaskFor.
250     */
251    public void testNewTaskForRunnable() throws InterruptedException {
252        final AtomicBoolean done = new AtomicBoolean(false);
253        class MyRunnableFuture<V> extends FutureTask<V> {
254            MyRunnableFuture(Runnable t, V r) { super(t, r); }
255            @Override protected void done() { done.set(true); }
256        }
257        final ExecutorService e =
258            new ThreadPoolExecutor(1, 1,
259                                   30L, TimeUnit.SECONDS,
260                                   new ArrayBlockingQueue<Runnable>(1)) {
261                protected <T> RunnableFuture<T> newTaskFor(Runnable t, T r) {
262                    return new MyRunnableFuture<T>(t, r);
263                }};
264        CompletionService<String> cs = new ExecutorCompletionService<>(e);
265        try (PoolCleaner cleaner = cleaner(e)) {
266            assertNull(cs.poll());
267            Runnable r = new NoOpRunnable();
268            Future f1 = cs.submit(r, null);
269            assertTrue("submit must return MyRunnableFuture",
270                       f1 instanceof MyRunnableFuture);
271            Future f2 = cs.take();
272            assertSame("submit and take must return same objects", f1, f2);
273            assertTrue("completed task must have set done", done.get());
274        }
275    }
276
277}
278