1/* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 19 * or visit www.oracle.com if you need additional information or have any 20 * questions. 21 */ 22 23/* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea with assistance from members of JCP JSR-166 30 * Expert Group and released to the public domain, as explained at 31 * http://creativecommons.org/publicdomain/zero/1.0/ 32 * Other contributors include Andrew Wright, Jeffrey Hayes, 33 * Pat Fisher, Mike Judd. 34 */ 35 36import static java.util.concurrent.TimeUnit.MILLISECONDS; 37 38import java.util.ArrayList; 39import java.util.Arrays; 40import java.util.Collection; 41import java.util.Iterator; 42import java.util.NoSuchElementException; 43import java.util.concurrent.BlockingQueue; 44import java.util.concurrent.CountDownLatch; 45import java.util.concurrent.Executors; 46import java.util.concurrent.ExecutorService; 47import java.util.concurrent.SynchronousQueue; 48 49import junit.framework.Test; 50 51public class SynchronousQueueTest extends JSR166TestCase { 52 53 public static class Fair extends BlockingQueueTest { 54 protected BlockingQueue emptyCollection() { 55 return new SynchronousQueue(true); 56 } 57 } 58 59 public static class NonFair extends BlockingQueueTest { 60 protected BlockingQueue emptyCollection() { 61 return new SynchronousQueue(false); 62 } 63 } 64 65 public static void main(String[] args) { 66 main(suite(), args); 67 } 68 69 public static Test suite() { 70 return newTestSuite(SynchronousQueueTest.class, 71 new Fair().testSuite(), 72 new NonFair().testSuite()); 73 } 74 75 /** 76 * Any SynchronousQueue is both empty and full 77 */ 78 public void testEmptyFull() { testEmptyFull(false); } 79 public void testEmptyFull_fair() { testEmptyFull(true); } 80 public void testEmptyFull(boolean fair) { 81 final SynchronousQueue q = new SynchronousQueue(fair); 82 assertTrue(q.isEmpty()); 83 assertEquals(0, q.size()); 84 assertEquals(0, q.remainingCapacity()); 85 assertFalse(q.offer(zero)); 86 } 87 88 /** 89 * offer fails if no active taker 90 */ 91 public void testOffer() { testOffer(false); } 92 public void testOffer_fair() { testOffer(true); } 93 public void testOffer(boolean fair) { 94 SynchronousQueue q = new SynchronousQueue(fair); 95 assertFalse(q.offer(one)); 96 } 97 98 /** 99 * add throws IllegalStateException if no active taker 100 */ 101 public void testAdd() { testAdd(false); } 102 public void testAdd_fair() { testAdd(true); } 103 public void testAdd(boolean fair) { 104 SynchronousQueue q = new SynchronousQueue(fair); 105 assertEquals(0, q.remainingCapacity()); 106 try { 107 q.add(one); 108 shouldThrow(); 109 } catch (IllegalStateException success) {} 110 } 111 112 /** 113 * addAll(this) throws IllegalArgumentException 114 */ 115 public void testAddAll_self() { testAddAll_self(false); } 116 public void testAddAll_self_fair() { testAddAll_self(true); } 117 public void testAddAll_self(boolean fair) { 118 SynchronousQueue q = new SynchronousQueue(fair); 119 try { 120 q.addAll(q); 121 shouldThrow(); 122 } catch (IllegalArgumentException success) {} 123 } 124 125 /** 126 * addAll throws ISE if no active taker 127 */ 128 public void testAddAll_ISE() { testAddAll_ISE(false); } 129 public void testAddAll_ISE_fair() { testAddAll_ISE(true); } 130 public void testAddAll_ISE(boolean fair) { 131 SynchronousQueue q = new SynchronousQueue(fair); 132 Integer[] ints = new Integer[1]; 133 for (int i = 0; i < ints.length; i++) 134 ints[i] = i; 135 Collection<Integer> coll = Arrays.asList(ints); 136 try { 137 q.addAll(coll); 138 shouldThrow(); 139 } catch (IllegalStateException success) {} 140 } 141 142 /** 143 * put blocks interruptibly if no active taker 144 */ 145 public void testBlockingPut() { testBlockingPut(false); } 146 public void testBlockingPut_fair() { testBlockingPut(true); } 147 public void testBlockingPut(boolean fair) { 148 final SynchronousQueue q = new SynchronousQueue(fair); 149 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 150 Thread t = newStartedThread(new CheckedRunnable() { 151 public void realRun() throws InterruptedException { 152 Thread.currentThread().interrupt(); 153 try { 154 q.put(99); 155 shouldThrow(); 156 } catch (InterruptedException success) {} 157 assertFalse(Thread.interrupted()); 158 159 pleaseInterrupt.countDown(); 160 try { 161 q.put(99); 162 shouldThrow(); 163 } catch (InterruptedException success) {} 164 assertFalse(Thread.interrupted()); 165 }}); 166 167 await(pleaseInterrupt); 168 assertThreadStaysAlive(t); 169 t.interrupt(); 170 awaitTermination(t); 171 assertEquals(0, q.remainingCapacity()); 172 } 173 174 /** 175 * put blocks interruptibly waiting for take 176 */ 177 public void testPutWithTake() { testPutWithTake(false); } 178 public void testPutWithTake_fair() { testPutWithTake(true); } 179 public void testPutWithTake(boolean fair) { 180 final SynchronousQueue q = new SynchronousQueue(fair); 181 final CountDownLatch pleaseTake = new CountDownLatch(1); 182 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 183 Thread t = newStartedThread(new CheckedRunnable() { 184 public void realRun() throws InterruptedException { 185 pleaseTake.countDown(); 186 q.put(one); 187 188 pleaseInterrupt.countDown(); 189 try { 190 q.put(99); 191 shouldThrow(); 192 } catch (InterruptedException success) {} 193 assertFalse(Thread.interrupted()); 194 }}); 195 196 await(pleaseTake); 197 assertEquals(0, q.remainingCapacity()); 198 try { assertSame(one, q.take()); } 199 catch (InterruptedException e) { threadUnexpectedException(e); } 200 201 await(pleaseInterrupt); 202 assertThreadStaysAlive(t); 203 t.interrupt(); 204 awaitTermination(t); 205 assertEquals(0, q.remainingCapacity()); 206 } 207 208 /** 209 * timed offer times out if elements not taken 210 */ 211 public void testTimedOffer() { testTimedOffer(false); } 212 public void testTimedOffer_fair() { testTimedOffer(true); } 213 public void testTimedOffer(boolean fair) { 214 final SynchronousQueue q = new SynchronousQueue(fair); 215 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 216 Thread t = newStartedThread(new CheckedRunnable() { 217 public void realRun() throws InterruptedException { 218 long startTime = System.nanoTime(); 219 assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS)); 220 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 221 pleaseInterrupt.countDown(); 222 try { 223 q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS); 224 shouldThrow(); 225 } catch (InterruptedException success) {} 226 }}); 227 228 await(pleaseInterrupt); 229 assertThreadStaysAlive(t); 230 t.interrupt(); 231 awaitTermination(t); 232 } 233 234 /** 235 * poll return null if no active putter 236 */ 237 public void testPoll() { testPoll(false); } 238 public void testPoll_fair() { testPoll(true); } 239 public void testPoll(boolean fair) { 240 final SynchronousQueue q = new SynchronousQueue(fair); 241 assertNull(q.poll()); 242 } 243 244 /** 245 * timed poll with zero timeout times out if no active putter 246 */ 247 public void testTimedPoll0() { testTimedPoll0(false); } 248 public void testTimedPoll0_fair() { testTimedPoll0(true); } 249 public void testTimedPoll0(boolean fair) { 250 final SynchronousQueue q = new SynchronousQueue(fair); 251 try { assertNull(q.poll(0, MILLISECONDS)); } 252 catch (InterruptedException e) { threadUnexpectedException(e); } 253 } 254 255 /** 256 * timed poll with nonzero timeout times out if no active putter 257 */ 258 public void testTimedPoll() { testTimedPoll(false); } 259 public void testTimedPoll_fair() { testTimedPoll(true); } 260 public void testTimedPoll(boolean fair) { 261 final SynchronousQueue q = new SynchronousQueue(fair); 262 long startTime = System.nanoTime(); 263 try { assertNull(q.poll(timeoutMillis(), MILLISECONDS)); } 264 catch (InterruptedException e) { threadUnexpectedException(e); } 265 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 266 } 267 268 /** 269 * timed poll before a delayed offer times out, returning null; 270 * after offer succeeds; on interruption throws 271 */ 272 public void testTimedPollWithOffer() { testTimedPollWithOffer(false); } 273 public void testTimedPollWithOffer_fair() { testTimedPollWithOffer(true); } 274 public void testTimedPollWithOffer(boolean fair) { 275 final SynchronousQueue q = new SynchronousQueue(fair); 276 final CountDownLatch pleaseOffer = new CountDownLatch(1); 277 final CountDownLatch pleaseInterrupt = new CountDownLatch(1); 278 Thread t = newStartedThread(new CheckedRunnable() { 279 public void realRun() throws InterruptedException { 280 long startTime = System.nanoTime(); 281 assertNull(q.poll(timeoutMillis(), MILLISECONDS)); 282 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 283 284 pleaseOffer.countDown(); 285 startTime = System.nanoTime(); 286 assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS)); 287 288 Thread.currentThread().interrupt(); 289 try { 290 q.poll(LONG_DELAY_MS, MILLISECONDS); 291 shouldThrow(); 292 } catch (InterruptedException success) {} 293 assertFalse(Thread.interrupted()); 294 295 pleaseInterrupt.countDown(); 296 try { 297 q.poll(LONG_DELAY_MS, MILLISECONDS); 298 shouldThrow(); 299 } catch (InterruptedException success) {} 300 assertFalse(Thread.interrupted()); 301 302 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 303 }}); 304 305 await(pleaseOffer); 306 long startTime = System.nanoTime(); 307 try { assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS)); } 308 catch (InterruptedException e) { threadUnexpectedException(e); } 309 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 310 311 await(pleaseInterrupt); 312 assertThreadStaysAlive(t); 313 t.interrupt(); 314 awaitTermination(t); 315 } 316 317 /** 318 * peek() returns null if no active putter 319 */ 320 public void testPeek() { testPeek(false); } 321 public void testPeek_fair() { testPeek(true); } 322 public void testPeek(boolean fair) { 323 final SynchronousQueue q = new SynchronousQueue(fair); 324 assertNull(q.peek()); 325 } 326 327 /** 328 * element() throws NoSuchElementException if no active putter 329 */ 330 public void testElement() { testElement(false); } 331 public void testElement_fair() { testElement(true); } 332 public void testElement(boolean fair) { 333 final SynchronousQueue q = new SynchronousQueue(fair); 334 try { 335 q.element(); 336 shouldThrow(); 337 } catch (NoSuchElementException success) {} 338 } 339 340 /** 341 * remove() throws NoSuchElementException if no active putter 342 */ 343 public void testRemove() { testRemove(false); } 344 public void testRemove_fair() { testRemove(true); } 345 public void testRemove(boolean fair) { 346 final SynchronousQueue q = new SynchronousQueue(fair); 347 try { 348 q.remove(); 349 shouldThrow(); 350 } catch (NoSuchElementException success) {} 351 } 352 353 /** 354 * contains returns false 355 */ 356 public void testContains() { testContains(false); } 357 public void testContains_fair() { testContains(true); } 358 public void testContains(boolean fair) { 359 final SynchronousQueue q = new SynchronousQueue(fair); 360 assertFalse(q.contains(zero)); 361 } 362 363 /** 364 * clear ensures isEmpty 365 */ 366 public void testClear() { testClear(false); } 367 public void testClear_fair() { testClear(true); } 368 public void testClear(boolean fair) { 369 final SynchronousQueue q = new SynchronousQueue(fair); 370 q.clear(); 371 assertTrue(q.isEmpty()); 372 } 373 374 /** 375 * containsAll returns false unless empty 376 */ 377 public void testContainsAll() { testContainsAll(false); } 378 public void testContainsAll_fair() { testContainsAll(true); } 379 public void testContainsAll(boolean fair) { 380 final SynchronousQueue q = new SynchronousQueue(fair); 381 Integer[] empty = new Integer[0]; 382 assertTrue(q.containsAll(Arrays.asList(empty))); 383 Integer[] ints = new Integer[1]; ints[0] = zero; 384 assertFalse(q.containsAll(Arrays.asList(ints))); 385 } 386 387 /** 388 * retainAll returns false 389 */ 390 public void testRetainAll() { testRetainAll(false); } 391 public void testRetainAll_fair() { testRetainAll(true); } 392 public void testRetainAll(boolean fair) { 393 final SynchronousQueue q = new SynchronousQueue(fair); 394 Integer[] empty = new Integer[0]; 395 assertFalse(q.retainAll(Arrays.asList(empty))); 396 Integer[] ints = new Integer[1]; ints[0] = zero; 397 assertFalse(q.retainAll(Arrays.asList(ints))); 398 } 399 400 /** 401 * removeAll returns false 402 */ 403 public void testRemoveAll() { testRemoveAll(false); } 404 public void testRemoveAll_fair() { testRemoveAll(true); } 405 public void testRemoveAll(boolean fair) { 406 final SynchronousQueue q = new SynchronousQueue(fair); 407 Integer[] empty = new Integer[0]; 408 assertFalse(q.removeAll(Arrays.asList(empty))); 409 Integer[] ints = new Integer[1]; ints[0] = zero; 410 assertFalse(q.containsAll(Arrays.asList(ints))); 411 } 412 413 /** 414 * toArray is empty 415 */ 416 public void testToArray() { testToArray(false); } 417 public void testToArray_fair() { testToArray(true); } 418 public void testToArray(boolean fair) { 419 final SynchronousQueue q = new SynchronousQueue(fair); 420 Object[] o = q.toArray(); 421 assertEquals(0, o.length); 422 } 423 424 /** 425 * toArray(Integer array) returns its argument with the first 426 * element (if present) nulled out 427 */ 428 public void testToArray2() { testToArray2(false); } 429 public void testToArray2_fair() { testToArray2(true); } 430 public void testToArray2(boolean fair) { 431 final SynchronousQueue<Integer> q = new SynchronousQueue<>(fair); 432 Integer[] a; 433 434 a = new Integer[0]; 435 assertSame(a, q.toArray(a)); 436 437 a = new Integer[3]; 438 Arrays.fill(a, 42); 439 assertSame(a, q.toArray(a)); 440 assertNull(a[0]); 441 for (int i = 1; i < a.length; i++) 442 assertEquals(42, (int) a[i]); 443 } 444 445 /** 446 * toArray(null) throws NPE 447 */ 448 public void testToArray_null() { testToArray_null(false); } 449 public void testToArray_null_fair() { testToArray_null(true); } 450 public void testToArray_null(boolean fair) { 451 final SynchronousQueue q = new SynchronousQueue(fair); 452 try { 453 Object[] o = q.toArray(null); 454 shouldThrow(); 455 } catch (NullPointerException success) {} 456 } 457 458 /** 459 * iterator does not traverse any elements 460 */ 461 public void testIterator() { testIterator(false); } 462 public void testIterator_fair() { testIterator(true); } 463 public void testIterator(boolean fair) { 464 assertIteratorExhausted(new SynchronousQueue(fair).iterator()); 465 } 466 467 /** 468 * iterator remove throws ISE 469 */ 470 public void testIteratorRemove() { testIteratorRemove(false); } 471 public void testIteratorRemove_fair() { testIteratorRemove(true); } 472 public void testIteratorRemove(boolean fair) { 473 final SynchronousQueue q = new SynchronousQueue(fair); 474 Iterator it = q.iterator(); 475 try { 476 it.remove(); 477 shouldThrow(); 478 } catch (IllegalStateException success) {} 479 } 480 481 /** 482 * toString returns a non-null string 483 */ 484 public void testToString() { testToString(false); } 485 public void testToString_fair() { testToString(true); } 486 public void testToString(boolean fair) { 487 final SynchronousQueue q = new SynchronousQueue(fair); 488 String s = q.toString(); 489 assertNotNull(s); 490 } 491 492 /** 493 * offer transfers elements across Executor tasks 494 */ 495 public void testOfferInExecutor() { testOfferInExecutor(false); } 496 public void testOfferInExecutor_fair() { testOfferInExecutor(true); } 497 public void testOfferInExecutor(boolean fair) { 498 final SynchronousQueue q = new SynchronousQueue(fair); 499 final CheckedBarrier threadsStarted = new CheckedBarrier(2); 500 final ExecutorService executor = Executors.newFixedThreadPool(2); 501 try (PoolCleaner cleaner = cleaner(executor)) { 502 503 executor.execute(new CheckedRunnable() { 504 public void realRun() throws InterruptedException { 505 assertFalse(q.offer(one)); 506 threadsStarted.await(); 507 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS)); 508 assertEquals(0, q.remainingCapacity()); 509 }}); 510 511 executor.execute(new CheckedRunnable() { 512 public void realRun() throws InterruptedException { 513 threadsStarted.await(); 514 assertSame(one, q.take()); 515 }}); 516 } 517 } 518 519 /** 520 * timed poll retrieves elements across Executor threads 521 */ 522 public void testPollInExecutor() { testPollInExecutor(false); } 523 public void testPollInExecutor_fair() { testPollInExecutor(true); } 524 public void testPollInExecutor(boolean fair) { 525 final SynchronousQueue q = new SynchronousQueue(fair); 526 final CheckedBarrier threadsStarted = new CheckedBarrier(2); 527 final ExecutorService executor = Executors.newFixedThreadPool(2); 528 try (PoolCleaner cleaner = cleaner(executor)) { 529 executor.execute(new CheckedRunnable() { 530 public void realRun() throws InterruptedException { 531 assertNull(q.poll()); 532 threadsStarted.await(); 533 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS)); 534 assertTrue(q.isEmpty()); 535 }}); 536 537 executor.execute(new CheckedRunnable() { 538 public void realRun() throws InterruptedException { 539 threadsStarted.await(); 540 q.put(one); 541 }}); 542 } 543 } 544 545 /** 546 * a deserialized serialized queue is usable 547 */ 548 public void testSerialization() { 549 final SynchronousQueue x = new SynchronousQueue(); 550 final SynchronousQueue y = new SynchronousQueue(false); 551 final SynchronousQueue z = new SynchronousQueue(true); 552 assertSerialEquals(x, y); 553 assertNotSerialEquals(x, z); 554 SynchronousQueue[] qs = { x, y, z }; 555 for (SynchronousQueue q : qs) { 556 SynchronousQueue clone = serialClone(q); 557 assertNotSame(q, clone); 558 assertSerialEquals(q, clone); 559 assertTrue(clone.isEmpty()); 560 assertEquals(0, clone.size()); 561 assertEquals(0, clone.remainingCapacity()); 562 assertFalse(clone.offer(zero)); 563 } 564 } 565 566 /** 567 * drainTo(c) of empty queue doesn't transfer elements 568 */ 569 public void testDrainTo() { testDrainTo(false); } 570 public void testDrainTo_fair() { testDrainTo(true); } 571 public void testDrainTo(boolean fair) { 572 final SynchronousQueue q = new SynchronousQueue(fair); 573 ArrayList l = new ArrayList(); 574 q.drainTo(l); 575 assertEquals(0, q.size()); 576 assertEquals(0, l.size()); 577 } 578 579 /** 580 * drainTo empties queue, unblocking a waiting put. 581 */ 582 public void testDrainToWithActivePut() { testDrainToWithActivePut(false); } 583 public void testDrainToWithActivePut_fair() { testDrainToWithActivePut(true); } 584 public void testDrainToWithActivePut(boolean fair) { 585 final SynchronousQueue q = new SynchronousQueue(fair); 586 Thread t = newStartedThread(new CheckedRunnable() { 587 public void realRun() throws InterruptedException { 588 q.put(one); 589 }}); 590 591 ArrayList l = new ArrayList(); 592 long startTime = System.nanoTime(); 593 while (l.isEmpty()) { 594 q.drainTo(l); 595 if (millisElapsedSince(startTime) > LONG_DELAY_MS) 596 fail("timed out"); 597 Thread.yield(); 598 } 599 assertTrue(l.size() == 1); 600 assertSame(one, l.get(0)); 601 awaitTermination(t); 602 } 603 604 /** 605 * drainTo(c, n) empties up to n elements of queue into c 606 */ 607 public void testDrainToN() throws InterruptedException { 608 final SynchronousQueue q = new SynchronousQueue(); 609 Thread t1 = newStartedThread(new CheckedRunnable() { 610 public void realRun() throws InterruptedException { 611 q.put(one); 612 }}); 613 614 Thread t2 = newStartedThread(new CheckedRunnable() { 615 public void realRun() throws InterruptedException { 616 q.put(two); 617 }}); 618 619 ArrayList l = new ArrayList(); 620 int drained; 621 while ((drained = q.drainTo(l, 1)) == 0) Thread.yield(); 622 assertEquals(1, drained); 623 assertEquals(1, l.size()); 624 while ((drained = q.drainTo(l, 1)) == 0) Thread.yield(); 625 assertEquals(1, drained); 626 assertEquals(2, l.size()); 627 assertTrue(l.contains(one)); 628 assertTrue(l.contains(two)); 629 awaitTermination(t1); 630 awaitTermination(t2); 631 } 632 633 /** 634 * remove(null), contains(null) always return false 635 */ 636 public void testNeverContainsNull() { 637 Collection<?> q = new SynchronousQueue(); 638 assertFalse(q.contains(null)); 639 assertFalse(q.remove(null)); 640 } 641 642} 643