1/*
2 * Copyright (c) 2007, 2015, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24/*
25 * @test
26 * @bug 6450200
27 * @summary Test proper handling of pool state changes
28 * @library /lib/testlibrary/
29 * @build jdk.testlibrary.RandomFactory
30 * @run main/othervm ConfigChanges
31 * @key randomness
32 * @author Martin Buchholz
33 */
34
35import static java.util.concurrent.TimeUnit.MILLISECONDS;
36import static java.util.concurrent.TimeUnit.MINUTES;
37import static java.util.concurrent.TimeUnit.NANOSECONDS;
38
39import java.security.Permission;
40import java.util.Random;
41import java.util.concurrent.ArrayBlockingQueue;
42import java.util.concurrent.CyclicBarrier;
43import java.util.concurrent.ExecutorService;
44import java.util.concurrent.RejectedExecutionException;
45import java.util.concurrent.ThreadFactory;
46import java.util.concurrent.ThreadPoolExecutor;
47import java.util.function.Supplier;
48import jdk.testlibrary.RandomFactory;
49
50public class ConfigChanges {
51    static final ThreadGroup tg = new ThreadGroup("pool");
52
53    static final Random rnd = RandomFactory.getRandom();
54
55    static void report(ThreadPoolExecutor tpe) {
56        try {
57            System.out.printf(
58                "active=%d submitted=%d completed=%d queued=%d sizes=%d/%d/%d%n",
59                tg.activeCount(),
60                tpe.getTaskCount(),
61                tpe.getCompletedTaskCount(),
62                tpe.getQueue().size(),
63                tpe.getPoolSize(),
64                tpe.getCorePoolSize(),
65                tpe.getMaximumPoolSize());
66        } catch (Throwable t) { unexpected(t); }
67    }
68
69    static void report(String label, ThreadPoolExecutor tpe) {
70        System.out.printf("%10s ", label);
71        report(tpe);
72    }
73
74    static class PermissiveSecurityManger extends SecurityManager {
75        public void checkPermission(Permission p) { /* bien sur, Monsieur */ }
76    }
77
78    static void checkShutdown(final ExecutorService es) {
79        final Runnable nop = new Runnable() {public void run() {}};
80        try {
81            if (new Random().nextBoolean()) {
82                check(es.isShutdown());
83                if (es instanceof ThreadPoolExecutor)
84                    check(((ThreadPoolExecutor) es).isTerminating()
85                          || es.isTerminated());
86                THROWS(RejectedExecutionException.class,
87                       () -> es.execute(nop));
88            }
89        } catch (Throwable t) { unexpected(t); }
90    }
91
92    static void checkTerminated(final ThreadPoolExecutor tpe) {
93        try {
94            checkShutdown(tpe);
95            check(tpe.getQueue().isEmpty());
96            check(tpe.isTerminated());
97            check(! tpe.isTerminating());
98            equal(0, tpe.getActiveCount());
99            equal(0, tpe.getPoolSize());
100            equal(tpe.getTaskCount(), tpe.getCompletedTaskCount());
101            check(tpe.awaitTermination(0L, MINUTES));
102        } catch (Throwable t) { unexpected(t); }
103    }
104
105    static Runnable waiter(final CyclicBarrier barrier) {
106        return new Runnable() { public void run() {
107            try { barrier.await(); barrier.await(); }
108            catch (Throwable t) { unexpected(t); }}};
109    }
110
111    static volatile Runnable runnableDuJour;
112
113    static void awaitIdleness(ThreadPoolExecutor tpe, long taskCount) {
114        restart: for (;;) {
115            // check twice to make chance of race vanishingly small
116            for (int i = 0; i < 2; i++) {
117                if (tpe.getQueue().size() != 0 ||
118                    tpe.getActiveCount() != 0 ||
119                    tpe.getCompletedTaskCount() != taskCount) {
120                    Thread.yield();
121                    continue restart;
122                }
123            }
124            return;
125        }
126    }
127
128    /**
129     * Waits for condition to become true, first spin-polling, then sleep-polling.
130     */
131    static void spinAwait(Supplier<Boolean> waitingForGodot) {
132        for (int spins = 0; !waitingForGodot.get(); ) {
133            if ((spins = (spins + 1) & 3) > 0) {
134                Thread.yield();
135            } else {
136                try { Thread.sleep(4); }
137                catch (InterruptedException unexpected) {
138                    throw new AssertionError(unexpected);
139                }
140            }
141        }
142    }
143
144    private static void realMain(String[] args) throws Throwable {
145        if (rnd.nextBoolean())
146            System.setSecurityManager(new PermissiveSecurityManger());
147
148        final boolean prestart = rnd.nextBoolean();
149
150        final Thread.UncaughtExceptionHandler handler
151            = new Thread.UncaughtExceptionHandler() {
152                    public void uncaughtException(Thread t, Throwable e) {
153                        check(! Thread.currentThread().isInterrupted());
154                        unexpected(e);
155                    }};
156
157        final int n = 3;
158        final ThreadPoolExecutor tpe
159            = new ThreadPoolExecutor(n, 3*n,
160                                     3L, MINUTES,
161                                     new ArrayBlockingQueue<Runnable>(3*n));
162        tpe.setThreadFactory(new ThreadFactory() {
163                public Thread newThread(Runnable r) {
164                    Thread t = new Thread(tg, r);
165                    t.setUncaughtExceptionHandler(handler);
166                    return t;
167                }});
168
169        if (prestart) {
170            tpe.prestartAllCoreThreads();
171            equal(n, tg.activeCount());
172            equal(n, tpe.getCorePoolSize());
173            equal(n, tpe.getLargestPoolSize());
174        }
175
176        final Runnable runRunnableDuJour =
177            new Runnable() { public void run() {
178                // Delay choice of action till last possible moment.
179                runnableDuJour.run(); }};
180        final CyclicBarrier pumpedUp = new CyclicBarrier(3*n + 1);
181        runnableDuJour = waiter(pumpedUp);
182
183        if (prestart) {
184            for (int i = 0; i < 1*n; i++)
185                tpe.execute(runRunnableDuJour);
186            // Wait for prestarted threads to dequeue their initial tasks.
187            while (! tpe.getQueue().isEmpty())
188                Thread.sleep(1);
189            for (int i = 0; i < 5*n; i++)
190                tpe.execute(runRunnableDuJour);
191        } else {
192            for (int i = 0; i < 6*n; i++)
193                tpe.execute(runRunnableDuJour);
194        }
195
196        //report("submitted", tpe);
197        pumpedUp.await();
198        equal(3*n, tg.activeCount());
199        equal(3*n, tpe.getMaximumPoolSize());
200        equal(3*n, tpe.getLargestPoolSize());
201        equal(n, tpe.getCorePoolSize());
202        equal(3*n, tpe.getActiveCount());
203        equal(6L*n, tpe.getTaskCount());
204        equal(0L, tpe.getCompletedTaskCount());
205
206        //report("pumped up", tpe);
207        tpe.setMaximumPoolSize(4*n);
208        equal(4*n, tpe.getMaximumPoolSize());
209        //report("pumped up2", tpe);
210        final CyclicBarrier pumpedUp2 = new CyclicBarrier(n + 1);
211        runnableDuJour = waiter(pumpedUp2);
212        for (int i = 0; i < 1*n; i++)
213            tpe.execute(runRunnableDuJour);
214        pumpedUp2.await();
215        equal(4*n, tg.activeCount());
216        equal(4*n, tpe.getMaximumPoolSize());
217        equal(4*n, tpe.getLargestPoolSize());
218        equal(4*n, tpe.getActiveCount());
219        equal(7L*n, tpe.getTaskCount());
220        equal(0L, tpe.getCompletedTaskCount());
221        //report("pumped up2", tpe);
222        runnableDuJour = new Runnable() { public void run() {}};
223
224        tpe.setMaximumPoolSize(2*n);
225        //report("after setMaximumPoolSize", tpe);
226
227        pumpedUp2.await();
228        pumpedUp.await();
229
230        spinAwait(() -> tg.activeCount() == 2*n);
231        equal(2*n, tpe.getMaximumPoolSize());
232        equal(4*n, tpe.getLargestPoolSize());
233
234        //report("draining", tpe);
235        awaitIdleness(tpe, 7L*n);
236
237        equal(2*n, tg.activeCount());
238        equal(2*n, tpe.getMaximumPoolSize());
239        equal(4*n, tpe.getLargestPoolSize());
240
241        equal(7L*n, tpe.getTaskCount());
242        equal(7L*n, tpe.getCompletedTaskCount());
243        equal(0, tpe.getActiveCount());
244
245        equal(3L, tpe.getKeepAliveTime(MINUTES));
246        long t0 = System.nanoTime();
247        tpe.setKeepAliveTime(7L, MILLISECONDS);
248        equal(7L, tpe.getKeepAliveTime(MILLISECONDS));
249        spinAwait(() -> tg.activeCount() == n);
250        check(System.nanoTime() - t0 >= tpe.getKeepAliveTime(NANOSECONDS));
251
252        //report("idle", tpe);
253        check(! tpe.allowsCoreThreadTimeOut());
254        t0 = System.nanoTime();
255        tpe.allowCoreThreadTimeOut(true);
256        check(tpe.allowsCoreThreadTimeOut());
257        spinAwait(() -> tg.activeCount() == 0);
258
259        // The following assertion is almost always true, but may
260        // exceptionally not be during a transition from core count
261        // too high to allowCoreThreadTimeOut.  Users will never
262        // notice, and we accept the small loss of testability.
263        //
264        // check(System.nanoTime() - t0 >= tpe.getKeepAliveTime(NANOSECONDS));
265
266        //report("idle", tpe);
267
268        tpe.shutdown();
269        checkShutdown(tpe);
270        check(tpe.awaitTermination(3L, MINUTES));
271        checkTerminated(tpe);
272    }
273
274    //--------------------- Infrastructure ---------------------------
275    static volatile int passed = 0, failed = 0;
276    static void pass() {passed++;}
277    static void fail() {failed++; Thread.dumpStack();}
278    static void fail(String msg) {System.out.println(msg); fail();}
279    static void unexpected(Throwable t) {failed++; t.printStackTrace();}
280    static void check(boolean cond) {if (cond) pass(); else fail();}
281    static void equal(Object x, Object y) {
282        if (x == null ? y == null : x.equals(y)) pass();
283        else fail(x + " not equal to " + y);}
284    public static void main(String[] args) throws Throwable {
285        try {realMain(args);} catch (Throwable t) {unexpected(t);}
286        System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
287        if (failed > 0) throw new AssertionError("Some tests failed");}
288    interface Fun {void f() throws Throwable;}
289    static void THROWS(Class<? extends Throwable> k, Fun... fs) {
290        for (Fun f : fs)
291            try { f.f(); fail("Expected " + k.getName() + " not thrown"); }
292            catch (Throwable t) {
293                if (k.isAssignableFrom(t.getClass())) pass();
294                else unexpected(t);}}
295}
296