1/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This code is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11 * version 2 for more details (a copy is included in the LICENSE file that
12 * accompanied this code).
13 *
14 * You should have received a copy of the GNU General Public License version
15 * 2 along with this work; if not, write to the Free Software Foundation,
16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
17 *
18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
19 * or visit www.oracle.com if you need additional information or have any
20 * questions.
21 */
22
23/*
24 * This file is available under and governed by the GNU General Public
25 * License version 2 only, as published by the Free Software Foundation.
26 * However, the following notice accompanied the original version of this
27 * file:
28 *
29 * Written by Doug Lea with assistance from members of JCP JSR-166
30 * Expert Group and released to the public domain, as explained at
31 * http://creativecommons.org/publicdomain/zero/1.0/
32 */
33
34/*
35 * @test
36 * @bug 4486658
37 * @summary  multiple producers and single consumer using blocking queues
38 * @library /lib/testlibrary/
39 */
40
41import static java.util.concurrent.TimeUnit.MILLISECONDS;
42import static java.util.concurrent.TimeUnit.NANOSECONDS;
43
44import java.util.concurrent.ArrayBlockingQueue;
45import java.util.concurrent.BlockingQueue;
46import java.util.concurrent.CyclicBarrier;
47import java.util.concurrent.ExecutorService;
48import java.util.concurrent.Executors;
49import java.util.concurrent.LinkedBlockingDeque;
50import java.util.concurrent.LinkedBlockingQueue;
51import java.util.concurrent.LinkedTransferQueue;
52import java.util.concurrent.PriorityBlockingQueue;
53import java.util.concurrent.SynchronousQueue;
54import java.util.concurrent.atomic.AtomicInteger;
55import jdk.testlibrary.Utils;
56
57public class MultipleProducersSingleConsumerLoops {
58    static final long LONG_DELAY_MS = Utils.adjustTimeout(10_000);
59    static ExecutorService pool;
60
61    public static void main(String[] args) throws Exception {
62        final int maxProducers = (args.length > 0)
63            ? Integer.parseInt(args[0])
64            : 5;
65
66        pool = Executors.newCachedThreadPool();
67        for (int i = 1; i <= maxProducers; i += (i+1) >>> 1) {
68            // Adjust iterations to limit typical single runs to <= 10 ms;
69            // Notably, fair queues get fewer iters.
70            // Unbounded queues can legitimately OOME if iterations
71            // high enough, but we have a sufficiently low limit here.
72            run(new ArrayBlockingQueue<Integer>(100), i, 300);
73            run(new LinkedBlockingQueue<Integer>(100), i, 700);
74            run(new LinkedBlockingDeque<Integer>(100), i , 500);
75            run(new LinkedTransferQueue<Integer>(), i, 1000);
76            run(new PriorityBlockingQueue<Integer>(), i, 1000);
77            run(new SynchronousQueue<Integer>(), i, 500);
78            run(new SynchronousQueue<Integer>(true), i, 200);
79            run(new ArrayBlockingQueue<Integer>(100, true), i, 100);
80        }
81
82        pool.shutdown();
83        if (! pool.awaitTermination(LONG_DELAY_MS, MILLISECONDS))
84            throw new Error();
85        pool = null;
86    }
87
88    static void run(BlockingQueue<Integer> queue, int nproducers, int iters) throws Exception {
89        new MultipleProducersSingleConsumerLoops(queue, nproducers, iters).run();
90    }
91
92    final BlockingQueue<Integer> queue;
93    final int nproducers;
94    final int iters;
95    final LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
96    final CyclicBarrier barrier;
97    final AtomicInteger checksum = new AtomicInteger(0);
98    Throwable fail;
99
100    MultipleProducersSingleConsumerLoops(BlockingQueue<Integer> queue, int nproducers, int iters) {
101        this.queue = queue;
102        this.nproducers = nproducers;
103        this.iters = iters;
104        this.barrier = new CyclicBarrier(nproducers + 2, timer);
105    }
106
107    void run() throws Exception {
108        for (int i = 0; i < nproducers; i++)
109            pool.execute(new Producer());
110        pool.execute(new Consumer());
111        barrier.await();
112        barrier.await();
113        System.out.printf("%s, nproducers=%d:  %d ms%n",
114                          queue.getClass().getSimpleName(), nproducers,
115                          NANOSECONDS.toMillis(timer.getTime()));
116        if (checksum.get() != 0) throw new AssertionError("checksum mismatch");
117        if (fail != null) throw new AssertionError(fail);
118    }
119
120    abstract class CheckedRunnable implements Runnable {
121        abstract void realRun() throws Throwable;
122        public final void run() {
123            try {
124                realRun();
125            } catch (Throwable t) {
126                fail = t;
127                t.printStackTrace();
128                throw new AssertionError(t);
129            }
130        }
131    }
132
133    class Producer extends CheckedRunnable {
134        void realRun() throws Throwable {
135            barrier.await();
136            int s = 0;
137            int l = hashCode();
138            for (int i = 0; i < iters; i++) {
139                l = LoopHelpers.compute1(l);
140                l = LoopHelpers.compute2(l);
141                queue.put(new Integer(l));
142                s += l;
143            }
144            checksum.getAndAdd(s);
145            barrier.await();
146        }
147    }
148
149    class Consumer extends CheckedRunnable {
150        void realRun() throws Throwable {
151            barrier.await();
152            int s = 0;
153            for (int i = 0; i < nproducers * iters; i++) {
154                s += queue.take().intValue();
155            }
156            checksum.getAndAdd(-s);
157            barrier.await();
158        }
159    }
160}
161