1/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This code is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11 * version 2 for more details (a copy is included in the LICENSE file that
12 * accompanied this code).
13 *
14 * You should have received a copy of the GNU General Public License version
15 * 2 along with this work; if not, write to the Free Software Foundation,
16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
17 *
18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
19 * or visit www.oracle.com if you need additional information or have any
20 * questions.
21 */
22
23/*
24 * This file is available under and governed by the GNU General Public
25 * License version 2 only, as published by the Free Software Foundation.
26 * However, the following notice accompanied the original version of this
27 * file:
28 *
29 * Written by Doug Lea with assistance from members of JCP JSR-166
30 * Expert Group and released to the public domain, as explained at
31 * http://creativecommons.org/publicdomain/zero/1.0/
32 */
33
34/*
35 * @test
36 * @bug 4486658
37 * @summary  check ordering for blocking queues with 1 producer and multiple consumers
38 * @library /lib/testlibrary/
39 */
40
41import static java.util.concurrent.TimeUnit.MILLISECONDS;
42import static java.util.concurrent.TimeUnit.NANOSECONDS;
43
44import java.util.concurrent.ArrayBlockingQueue;
45import java.util.concurrent.BlockingQueue;
46import java.util.concurrent.CyclicBarrier;
47import java.util.concurrent.ExecutorService;
48import java.util.concurrent.Executors;
49import java.util.concurrent.LinkedBlockingDeque;
50import java.util.concurrent.LinkedBlockingQueue;
51import java.util.concurrent.LinkedTransferQueue;
52import java.util.concurrent.PriorityBlockingQueue;
53import java.util.concurrent.SynchronousQueue;
54import jdk.testlibrary.Utils;
55
56public class SingleProducerMultipleConsumerLoops {
57    static final long LONG_DELAY_MS = Utils.adjustTimeout(10_000);
58    static ExecutorService pool;
59
60    public static void main(String[] args) throws Exception {
61        final int maxConsumers = (args.length > 0)
62            ? Integer.parseInt(args[0])
63            : 5;
64
65        pool = Executors.newCachedThreadPool();
66        for (int i = 1; i <= maxConsumers; i += (i+1) >>> 1) {
67            // Adjust iterations to limit typical single runs to <= 10 ms;
68            // Notably, fair queues get fewer iters.
69            // Unbounded queues can legitimately OOME if iterations
70            // high enough, but we have a sufficiently low limit here.
71            run(new ArrayBlockingQueue<Integer>(100), i, 1000);
72            run(new LinkedBlockingQueue<Integer>(100), i, 1000);
73            run(new LinkedBlockingDeque<Integer>(100), i, 1000);
74            run(new LinkedTransferQueue<Integer>(), i, 700);
75            run(new PriorityBlockingQueue<Integer>(), i, 1000);
76            run(new SynchronousQueue<Integer>(), i, 300);
77            run(new SynchronousQueue<Integer>(true), i, 200);
78            run(new ArrayBlockingQueue<Integer>(100, true), i, 100);
79        }
80        pool.shutdown();
81        if (! pool.awaitTermination(LONG_DELAY_MS, MILLISECONDS))
82            throw new Error();
83        pool = null;
84   }
85
86    static void run(BlockingQueue<Integer> queue, int consumers, int iters) throws Exception {
87        new SingleProducerMultipleConsumerLoops(queue, consumers, iters).run();
88    }
89
90    final BlockingQueue<Integer> queue;
91    final int consumers;
92    final int iters;
93    final LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
94    final CyclicBarrier barrier;
95    Throwable fail;
96
97    SingleProducerMultipleConsumerLoops(BlockingQueue<Integer> queue, int consumers, int iters) {
98        this.queue = queue;
99        this.consumers = consumers;
100        this.iters = iters;
101        this.barrier = new CyclicBarrier(consumers + 2, timer);
102    }
103
104    void run() throws Exception {
105        pool.execute(new Producer());
106        for (int i = 0; i < consumers; i++) {
107            pool.execute(new Consumer());
108        }
109        barrier.await();
110        barrier.await();
111        System.out.printf("%s, consumers=%d: %d ms%n",
112                          queue.getClass().getSimpleName(), consumers,
113                          NANOSECONDS.toMillis(timer.getTime()));
114        if (fail != null) throw new AssertionError(fail);
115    }
116
117    abstract class CheckedRunnable implements Runnable {
118        abstract void realRun() throws Throwable;
119        public final void run() {
120            try {
121                realRun();
122            } catch (Throwable t) {
123                fail = t;
124                t.printStackTrace();
125                throw new AssertionError(t);
126            }
127        }
128    }
129
130    class Producer extends CheckedRunnable {
131        volatile int result;
132        void realRun() throws Throwable {
133            barrier.await();
134            for (int i = 0; i < iters * consumers; i++) {
135                queue.put(new Integer(i));
136            }
137            barrier.await();
138            result = 432;
139        }
140    }
141
142    class Consumer extends CheckedRunnable {
143        volatile int result;
144        void realRun() throws Throwable {
145            barrier.await();
146            int l = 0;
147            int s = 0;
148            int last = -1;
149            for (int i = 0; i < iters; i++) {
150                Integer item = queue.take();
151                int v = item.intValue();
152                if (v < last)
153                    throw new Error("Out-of-Order transfer");
154                last = v;
155                l = LoopHelpers.compute1(v);
156                s += l;
157            }
158            barrier.await();
159            result = s;
160        }
161    }
162}
163