1/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This code is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11 * version 2 for more details (a copy is included in the LICENSE file that
12 * accompanied this code).
13 *
14 * You should have received a copy of the GNU General Public License version
15 * 2 along with this work; if not, write to the Free Software Foundation,
16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
17 *
18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
19 * or visit www.oracle.com if you need additional information or have any
20 * questions.
21 */
22
23/*
24 * This file is available under and governed by the GNU General Public
25 * License version 2 only, as published by the Free Software Foundation.
26 * However, the following notice accompanied the original version of this
27 * file:
28 *
29 * Written by Doug Lea with assistance from members of JCP JSR-166
30 * Expert Group and released to the public domain, as explained at
31 * http://creativecommons.org/publicdomain/zero/1.0/
32 * Other contributors include Andrew Wright, Jeffrey Hayes,
33 * Pat Fisher, Mike Judd.
34 */
35
36import static java.util.concurrent.TimeUnit.MILLISECONDS;
37
38import java.util.ArrayList;
39import java.util.Arrays;
40import java.util.Collection;
41import java.util.Iterator;
42import java.util.NoSuchElementException;
43import java.util.concurrent.BlockingQueue;
44import java.util.concurrent.CountDownLatch;
45import java.util.concurrent.Executors;
46import java.util.concurrent.ExecutorService;
47import java.util.concurrent.SynchronousQueue;
48
49import junit.framework.Test;
50
51public class SynchronousQueueTest extends JSR166TestCase {
52
53    public static class Fair extends BlockingQueueTest {
54        protected BlockingQueue emptyCollection() {
55            return new SynchronousQueue(true);
56        }
57    }
58
59    public static class NonFair extends BlockingQueueTest {
60        protected BlockingQueue emptyCollection() {
61            return new SynchronousQueue(false);
62        }
63    }
64
65    public static void main(String[] args) {
66        main(suite(), args);
67    }
68
69    public static Test suite() {
70        return newTestSuite(SynchronousQueueTest.class,
71                            new Fair().testSuite(),
72                            new NonFair().testSuite());
73    }
74
75    /**
76     * Any SynchronousQueue is both empty and full
77     */
78    public void testEmptyFull()      { testEmptyFull(false); }
79    public void testEmptyFull_fair() { testEmptyFull(true); }
80    public void testEmptyFull(boolean fair) {
81        final SynchronousQueue q = new SynchronousQueue(fair);
82        assertTrue(q.isEmpty());
83        assertEquals(0, q.size());
84        assertEquals(0, q.remainingCapacity());
85        assertFalse(q.offer(zero));
86    }
87
88    /**
89     * offer fails if no active taker
90     */
91    public void testOffer()      { testOffer(false); }
92    public void testOffer_fair() { testOffer(true); }
93    public void testOffer(boolean fair) {
94        SynchronousQueue q = new SynchronousQueue(fair);
95        assertFalse(q.offer(one));
96    }
97
98    /**
99     * add throws IllegalStateException if no active taker
100     */
101    public void testAdd()      { testAdd(false); }
102    public void testAdd_fair() { testAdd(true); }
103    public void testAdd(boolean fair) {
104        SynchronousQueue q = new SynchronousQueue(fair);
105        assertEquals(0, q.remainingCapacity());
106        try {
107            q.add(one);
108            shouldThrow();
109        } catch (IllegalStateException success) {}
110    }
111
112    /**
113     * addAll(this) throws IllegalArgumentException
114     */
115    public void testAddAll_self()      { testAddAll_self(false); }
116    public void testAddAll_self_fair() { testAddAll_self(true); }
117    public void testAddAll_self(boolean fair) {
118        SynchronousQueue q = new SynchronousQueue(fair);
119        try {
120            q.addAll(q);
121            shouldThrow();
122        } catch (IllegalArgumentException success) {}
123    }
124
125    /**
126     * addAll throws ISE if no active taker
127     */
128    public void testAddAll_ISE()      { testAddAll_ISE(false); }
129    public void testAddAll_ISE_fair() { testAddAll_ISE(true); }
130    public void testAddAll_ISE(boolean fair) {
131        SynchronousQueue q = new SynchronousQueue(fair);
132        Integer[] ints = new Integer[1];
133        for (int i = 0; i < ints.length; i++)
134            ints[i] = i;
135        Collection<Integer> coll = Arrays.asList(ints);
136        try {
137            q.addAll(coll);
138            shouldThrow();
139        } catch (IllegalStateException success) {}
140    }
141
142    /**
143     * put blocks interruptibly if no active taker
144     */
145    public void testBlockingPut()      { testBlockingPut(false); }
146    public void testBlockingPut_fair() { testBlockingPut(true); }
147    public void testBlockingPut(boolean fair) {
148        final SynchronousQueue q = new SynchronousQueue(fair);
149        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
150        Thread t = newStartedThread(new CheckedRunnable() {
151            public void realRun() throws InterruptedException {
152                Thread.currentThread().interrupt();
153                try {
154                    q.put(99);
155                    shouldThrow();
156                } catch (InterruptedException success) {}
157                assertFalse(Thread.interrupted());
158
159                pleaseInterrupt.countDown();
160                try {
161                    q.put(99);
162                    shouldThrow();
163                } catch (InterruptedException success) {}
164                assertFalse(Thread.interrupted());
165            }});
166
167        await(pleaseInterrupt);
168        assertThreadStaysAlive(t);
169        t.interrupt();
170        awaitTermination(t);
171        assertEquals(0, q.remainingCapacity());
172    }
173
174    /**
175     * put blocks interruptibly waiting for take
176     */
177    public void testPutWithTake()      { testPutWithTake(false); }
178    public void testPutWithTake_fair() { testPutWithTake(true); }
179    public void testPutWithTake(boolean fair) {
180        final SynchronousQueue q = new SynchronousQueue(fair);
181        final CountDownLatch pleaseTake = new CountDownLatch(1);
182        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
183        Thread t = newStartedThread(new CheckedRunnable() {
184            public void realRun() throws InterruptedException {
185                pleaseTake.countDown();
186                q.put(one);
187
188                pleaseInterrupt.countDown();
189                try {
190                    q.put(99);
191                    shouldThrow();
192                } catch (InterruptedException success) {}
193                assertFalse(Thread.interrupted());
194            }});
195
196        await(pleaseTake);
197        assertEquals(0, q.remainingCapacity());
198        try { assertSame(one, q.take()); }
199        catch (InterruptedException e) { threadUnexpectedException(e); }
200
201        await(pleaseInterrupt);
202        assertThreadStaysAlive(t);
203        t.interrupt();
204        awaitTermination(t);
205        assertEquals(0, q.remainingCapacity());
206    }
207
208    /**
209     * timed offer times out if elements not taken
210     */
211    public void testTimedOffer()      { testTimedOffer(false); }
212    public void testTimedOffer_fair() { testTimedOffer(true); }
213    public void testTimedOffer(boolean fair) {
214        final SynchronousQueue q = new SynchronousQueue(fair);
215        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
216        Thread t = newStartedThread(new CheckedRunnable() {
217            public void realRun() throws InterruptedException {
218                long startTime = System.nanoTime();
219                assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS));
220                assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
221                pleaseInterrupt.countDown();
222                try {
223                    q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS);
224                    shouldThrow();
225                } catch (InterruptedException success) {}
226            }});
227
228        await(pleaseInterrupt);
229        assertThreadStaysAlive(t);
230        t.interrupt();
231        awaitTermination(t);
232    }
233
234    /**
235     * poll return null if no active putter
236     */
237    public void testPoll()      { testPoll(false); }
238    public void testPoll_fair() { testPoll(true); }
239    public void testPoll(boolean fair) {
240        final SynchronousQueue q = new SynchronousQueue(fair);
241        assertNull(q.poll());
242    }
243
244    /**
245     * timed poll with zero timeout times out if no active putter
246     */
247    public void testTimedPoll0()      { testTimedPoll0(false); }
248    public void testTimedPoll0_fair() { testTimedPoll0(true); }
249    public void testTimedPoll0(boolean fair) {
250        final SynchronousQueue q = new SynchronousQueue(fair);
251        try { assertNull(q.poll(0, MILLISECONDS)); }
252        catch (InterruptedException e) { threadUnexpectedException(e); }
253    }
254
255    /**
256     * timed poll with nonzero timeout times out if no active putter
257     */
258    public void testTimedPoll()      { testTimedPoll(false); }
259    public void testTimedPoll_fair() { testTimedPoll(true); }
260    public void testTimedPoll(boolean fair) {
261        final SynchronousQueue q = new SynchronousQueue(fair);
262        long startTime = System.nanoTime();
263        try { assertNull(q.poll(timeoutMillis(), MILLISECONDS)); }
264        catch (InterruptedException e) { threadUnexpectedException(e); }
265        assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
266    }
267
268    /**
269     * timed poll before a delayed offer times out, returning null;
270     * after offer succeeds; on interruption throws
271     */
272    public void testTimedPollWithOffer()      { testTimedPollWithOffer(false); }
273    public void testTimedPollWithOffer_fair() { testTimedPollWithOffer(true); }
274    public void testTimedPollWithOffer(boolean fair) {
275        final SynchronousQueue q = new SynchronousQueue(fair);
276        final CountDownLatch pleaseOffer = new CountDownLatch(1);
277        final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
278        Thread t = newStartedThread(new CheckedRunnable() {
279            public void realRun() throws InterruptedException {
280                long startTime = System.nanoTime();
281                assertNull(q.poll(timeoutMillis(), MILLISECONDS));
282                assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
283
284                pleaseOffer.countDown();
285                startTime = System.nanoTime();
286                assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS));
287
288                Thread.currentThread().interrupt();
289                try {
290                    q.poll(LONG_DELAY_MS, MILLISECONDS);
291                    shouldThrow();
292                } catch (InterruptedException success) {}
293                assertFalse(Thread.interrupted());
294
295                pleaseInterrupt.countDown();
296                try {
297                    q.poll(LONG_DELAY_MS, MILLISECONDS);
298                    shouldThrow();
299                } catch (InterruptedException success) {}
300                assertFalse(Thread.interrupted());
301
302                assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
303            }});
304
305        await(pleaseOffer);
306        long startTime = System.nanoTime();
307        try { assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS)); }
308        catch (InterruptedException e) { threadUnexpectedException(e); }
309        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
310
311        await(pleaseInterrupt);
312        assertThreadStaysAlive(t);
313        t.interrupt();
314        awaitTermination(t);
315    }
316
317    /**
318     * peek() returns null if no active putter
319     */
320    public void testPeek()      { testPeek(false); }
321    public void testPeek_fair() { testPeek(true); }
322    public void testPeek(boolean fair) {
323        final SynchronousQueue q = new SynchronousQueue(fair);
324        assertNull(q.peek());
325    }
326
327    /**
328     * element() throws NoSuchElementException if no active putter
329     */
330    public void testElement()      { testElement(false); }
331    public void testElement_fair() { testElement(true); }
332    public void testElement(boolean fair) {
333        final SynchronousQueue q = new SynchronousQueue(fair);
334        try {
335            q.element();
336            shouldThrow();
337        } catch (NoSuchElementException success) {}
338    }
339
340    /**
341     * remove() throws NoSuchElementException if no active putter
342     */
343    public void testRemove()      { testRemove(false); }
344    public void testRemove_fair() { testRemove(true); }
345    public void testRemove(boolean fair) {
346        final SynchronousQueue q = new SynchronousQueue(fair);
347        try {
348            q.remove();
349            shouldThrow();
350        } catch (NoSuchElementException success) {}
351    }
352
353    /**
354     * contains returns false
355     */
356    public void testContains()      { testContains(false); }
357    public void testContains_fair() { testContains(true); }
358    public void testContains(boolean fair) {
359        final SynchronousQueue q = new SynchronousQueue(fair);
360        assertFalse(q.contains(zero));
361    }
362
363    /**
364     * clear ensures isEmpty
365     */
366    public void testClear()      { testClear(false); }
367    public void testClear_fair() { testClear(true); }
368    public void testClear(boolean fair) {
369        final SynchronousQueue q = new SynchronousQueue(fair);
370        q.clear();
371        assertTrue(q.isEmpty());
372    }
373
374    /**
375     * containsAll returns false unless empty
376     */
377    public void testContainsAll()      { testContainsAll(false); }
378    public void testContainsAll_fair() { testContainsAll(true); }
379    public void testContainsAll(boolean fair) {
380        final SynchronousQueue q = new SynchronousQueue(fair);
381        Integer[] empty = new Integer[0];
382        assertTrue(q.containsAll(Arrays.asList(empty)));
383        Integer[] ints = new Integer[1]; ints[0] = zero;
384        assertFalse(q.containsAll(Arrays.asList(ints)));
385    }
386
387    /**
388     * retainAll returns false
389     */
390    public void testRetainAll()      { testRetainAll(false); }
391    public void testRetainAll_fair() { testRetainAll(true); }
392    public void testRetainAll(boolean fair) {
393        final SynchronousQueue q = new SynchronousQueue(fair);
394        Integer[] empty = new Integer[0];
395        assertFalse(q.retainAll(Arrays.asList(empty)));
396        Integer[] ints = new Integer[1]; ints[0] = zero;
397        assertFalse(q.retainAll(Arrays.asList(ints)));
398    }
399
400    /**
401     * removeAll returns false
402     */
403    public void testRemoveAll()      { testRemoveAll(false); }
404    public void testRemoveAll_fair() { testRemoveAll(true); }
405    public void testRemoveAll(boolean fair) {
406        final SynchronousQueue q = new SynchronousQueue(fair);
407        Integer[] empty = new Integer[0];
408        assertFalse(q.removeAll(Arrays.asList(empty)));
409        Integer[] ints = new Integer[1]; ints[0] = zero;
410        assertFalse(q.containsAll(Arrays.asList(ints)));
411    }
412
413    /**
414     * toArray is empty
415     */
416    public void testToArray()      { testToArray(false); }
417    public void testToArray_fair() { testToArray(true); }
418    public void testToArray(boolean fair) {
419        final SynchronousQueue q = new SynchronousQueue(fair);
420        Object[] o = q.toArray();
421        assertEquals(0, o.length);
422    }
423
424    /**
425     * toArray(Integer array) returns its argument with the first
426     * element (if present) nulled out
427     */
428    public void testToArray2()      { testToArray2(false); }
429    public void testToArray2_fair() { testToArray2(true); }
430    public void testToArray2(boolean fair) {
431        final SynchronousQueue<Integer> q = new SynchronousQueue<>(fair);
432        Integer[] a;
433
434        a = new Integer[0];
435        assertSame(a, q.toArray(a));
436
437        a = new Integer[3];
438        Arrays.fill(a, 42);
439        assertSame(a, q.toArray(a));
440        assertNull(a[0]);
441        for (int i = 1; i < a.length; i++)
442            assertEquals(42, (int) a[i]);
443    }
444
445    /**
446     * toArray(null) throws NPE
447     */
448    public void testToArray_null()      { testToArray_null(false); }
449    public void testToArray_null_fair() { testToArray_null(true); }
450    public void testToArray_null(boolean fair) {
451        final SynchronousQueue q = new SynchronousQueue(fair);
452        try {
453            Object[] o = q.toArray(null);
454            shouldThrow();
455        } catch (NullPointerException success) {}
456    }
457
458    /**
459     * iterator does not traverse any elements
460     */
461    public void testIterator()      { testIterator(false); }
462    public void testIterator_fair() { testIterator(true); }
463    public void testIterator(boolean fair) {
464        assertIteratorExhausted(new SynchronousQueue(fair).iterator());
465    }
466
467    /**
468     * iterator remove throws ISE
469     */
470    public void testIteratorRemove()      { testIteratorRemove(false); }
471    public void testIteratorRemove_fair() { testIteratorRemove(true); }
472    public void testIteratorRemove(boolean fair) {
473        final SynchronousQueue q = new SynchronousQueue(fair);
474        Iterator it = q.iterator();
475        try {
476            it.remove();
477            shouldThrow();
478        } catch (IllegalStateException success) {}
479    }
480
481    /**
482     * toString returns a non-null string
483     */
484    public void testToString()      { testToString(false); }
485    public void testToString_fair() { testToString(true); }
486    public void testToString(boolean fair) {
487        final SynchronousQueue q = new SynchronousQueue(fair);
488        String s = q.toString();
489        assertNotNull(s);
490    }
491
492    /**
493     * offer transfers elements across Executor tasks
494     */
495    public void testOfferInExecutor()      { testOfferInExecutor(false); }
496    public void testOfferInExecutor_fair() { testOfferInExecutor(true); }
497    public void testOfferInExecutor(boolean fair) {
498        final SynchronousQueue q = new SynchronousQueue(fair);
499        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
500        final ExecutorService executor = Executors.newFixedThreadPool(2);
501        try (PoolCleaner cleaner = cleaner(executor)) {
502
503            executor.execute(new CheckedRunnable() {
504                public void realRun() throws InterruptedException {
505                    assertFalse(q.offer(one));
506                    threadsStarted.await();
507                    assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
508                    assertEquals(0, q.remainingCapacity());
509                }});
510
511            executor.execute(new CheckedRunnable() {
512                public void realRun() throws InterruptedException {
513                    threadsStarted.await();
514                    assertSame(one, q.take());
515                }});
516        }
517    }
518
519    /**
520     * timed poll retrieves elements across Executor threads
521     */
522    public void testPollInExecutor()      { testPollInExecutor(false); }
523    public void testPollInExecutor_fair() { testPollInExecutor(true); }
524    public void testPollInExecutor(boolean fair) {
525        final SynchronousQueue q = new SynchronousQueue(fair);
526        final CheckedBarrier threadsStarted = new CheckedBarrier(2);
527        final ExecutorService executor = Executors.newFixedThreadPool(2);
528        try (PoolCleaner cleaner = cleaner(executor)) {
529            executor.execute(new CheckedRunnable() {
530                public void realRun() throws InterruptedException {
531                    assertNull(q.poll());
532                    threadsStarted.await();
533                    assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
534                    assertTrue(q.isEmpty());
535                }});
536
537            executor.execute(new CheckedRunnable() {
538                public void realRun() throws InterruptedException {
539                    threadsStarted.await();
540                    q.put(one);
541                }});
542        }
543    }
544
545    /**
546     * a deserialized serialized queue is usable
547     */
548    public void testSerialization() {
549        final SynchronousQueue x = new SynchronousQueue();
550        final SynchronousQueue y = new SynchronousQueue(false);
551        final SynchronousQueue z = new SynchronousQueue(true);
552        assertSerialEquals(x, y);
553        assertNotSerialEquals(x, z);
554        SynchronousQueue[] qs = { x, y, z };
555        for (SynchronousQueue q : qs) {
556            SynchronousQueue clone = serialClone(q);
557            assertNotSame(q, clone);
558            assertSerialEquals(q, clone);
559            assertTrue(clone.isEmpty());
560            assertEquals(0, clone.size());
561            assertEquals(0, clone.remainingCapacity());
562            assertFalse(clone.offer(zero));
563        }
564    }
565
566    /**
567     * drainTo(c) of empty queue doesn't transfer elements
568     */
569    public void testDrainTo()      { testDrainTo(false); }
570    public void testDrainTo_fair() { testDrainTo(true); }
571    public void testDrainTo(boolean fair) {
572        final SynchronousQueue q = new SynchronousQueue(fair);
573        ArrayList l = new ArrayList();
574        q.drainTo(l);
575        assertEquals(0, q.size());
576        assertEquals(0, l.size());
577    }
578
579    /**
580     * drainTo empties queue, unblocking a waiting put.
581     */
582    public void testDrainToWithActivePut()      { testDrainToWithActivePut(false); }
583    public void testDrainToWithActivePut_fair() { testDrainToWithActivePut(true); }
584    public void testDrainToWithActivePut(boolean fair) {
585        final SynchronousQueue q = new SynchronousQueue(fair);
586        Thread t = newStartedThread(new CheckedRunnable() {
587            public void realRun() throws InterruptedException {
588                q.put(one);
589            }});
590
591        ArrayList l = new ArrayList();
592        long startTime = System.nanoTime();
593        while (l.isEmpty()) {
594            q.drainTo(l);
595            if (millisElapsedSince(startTime) > LONG_DELAY_MS)
596                fail("timed out");
597            Thread.yield();
598        }
599        assertTrue(l.size() == 1);
600        assertSame(one, l.get(0));
601        awaitTermination(t);
602    }
603
604    /**
605     * drainTo(c, n) empties up to n elements of queue into c
606     */
607    public void testDrainToN() throws InterruptedException {
608        final SynchronousQueue q = new SynchronousQueue();
609        Thread t1 = newStartedThread(new CheckedRunnable() {
610            public void realRun() throws InterruptedException {
611                q.put(one);
612            }});
613
614        Thread t2 = newStartedThread(new CheckedRunnable() {
615            public void realRun() throws InterruptedException {
616                q.put(two);
617            }});
618
619        ArrayList l = new ArrayList();
620        int drained;
621        while ((drained = q.drainTo(l, 1)) == 0) Thread.yield();
622        assertEquals(1, drained);
623        assertEquals(1, l.size());
624        while ((drained = q.drainTo(l, 1)) == 0) Thread.yield();
625        assertEquals(1, drained);
626        assertEquals(2, l.size());
627        assertTrue(l.contains(one));
628        assertTrue(l.contains(two));
629        awaitTermination(t1);
630        awaitTermination(t2);
631    }
632
633    /**
634     * remove(null), contains(null) always return false
635     */
636    public void testNeverContainsNull() {
637        Collection<?> q = new SynchronousQueue();
638        assertFalse(q.contains(null));
639        assertFalse(q.remove(null));
640    }
641
642}
643