1/* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 19 * or visit www.oracle.com if you need additional information or have any 20 * questions. 21 */ 22 23/* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea and Martin Buchholz with assistance from 30 * members of JCP JSR-166 Expert Group and released to the public 31 * domain, as explained at 32 * http://creativecommons.org/publicdomain/zero/1.0/ 33 */ 34 35import java.util.concurrent.CompletableFuture; 36import java.util.concurrent.Executor; 37import java.util.concurrent.Executors; 38import java.util.concurrent.Flow; 39import java.util.concurrent.ForkJoinPool; 40import java.util.concurrent.SubmissionPublisher; 41import java.util.concurrent.atomic.AtomicInteger; 42import junit.framework.Test; 43import junit.framework.TestSuite; 44 45import static java.util.concurrent.Flow.Subscriber; 46import static java.util.concurrent.Flow.Subscription; 47import static java.util.concurrent.TimeUnit.MILLISECONDS; 48 49public class SubmissionPublisherTest extends JSR166TestCase { 50 51 public static void main(String[] args) { 52 main(suite(), args); 53 } 54 public static Test suite() { 55 return new TestSuite(SubmissionPublisherTest.class); 56 } 57 58 final Executor basicExecutor = basicPublisher().getExecutor(); 59 60 static SubmissionPublisher<Integer> basicPublisher() { 61 return new SubmissionPublisher<Integer>(); 62 } 63 64 static class SPException extends RuntimeException {} 65 66 class TestSubscriber implements Subscriber<Integer> { 67 volatile Subscription sn; 68 int last; // Requires that onNexts are in numeric order 69 volatile int nexts; 70 volatile int errors; 71 volatile int completes; 72 volatile boolean throwOnCall = false; 73 volatile boolean request = true; 74 volatile Throwable lastError; 75 76 public synchronized void onSubscribe(Subscription s) { 77 threadAssertTrue(sn == null); 78 sn = s; 79 notifyAll(); 80 if (throwOnCall) 81 throw new SPException(); 82 if (request) 83 sn.request(1L); 84 } 85 public synchronized void onNext(Integer t) { 86 ++nexts; 87 notifyAll(); 88 int current = t.intValue(); 89 threadAssertTrue(current >= last); 90 last = current; 91 if (request) 92 sn.request(1L); 93 if (throwOnCall) 94 throw new SPException(); 95 } 96 public synchronized void onError(Throwable t) { 97 threadAssertTrue(completes == 0); 98 threadAssertTrue(errors == 0); 99 lastError = t; 100 ++errors; 101 notifyAll(); 102 } 103 public synchronized void onComplete() { 104 threadAssertTrue(completes == 0); 105 ++completes; 106 notifyAll(); 107 } 108 109 synchronized void awaitSubscribe() { 110 while (sn == null) { 111 try { 112 wait(); 113 } catch (Exception ex) { 114 threadUnexpectedException(ex); 115 break; 116 } 117 } 118 } 119 synchronized void awaitNext(int n) { 120 while (nexts < n) { 121 try { 122 wait(); 123 } catch (Exception ex) { 124 threadUnexpectedException(ex); 125 break; 126 } 127 } 128 } 129 synchronized void awaitComplete() { 130 while (completes == 0 && errors == 0) { 131 try { 132 wait(); 133 } catch (Exception ex) { 134 threadUnexpectedException(ex); 135 break; 136 } 137 } 138 } 139 synchronized void awaitError() { 140 while (errors == 0) { 141 try { 142 wait(); 143 } catch (Exception ex) { 144 threadUnexpectedException(ex); 145 break; 146 } 147 } 148 } 149 150 } 151 152 /** 153 * A new SubmissionPublisher has no subscribers, a non-null 154 * executor, a power-of-two capacity, is not closed, and reports 155 * zero demand and lag 156 */ 157 void checkInitialState(SubmissionPublisher<?> p) { 158 assertFalse(p.hasSubscribers()); 159 assertEquals(0, p.getNumberOfSubscribers()); 160 assertTrue(p.getSubscribers().isEmpty()); 161 assertFalse(p.isClosed()); 162 assertNull(p.getClosedException()); 163 int n = p.getMaxBufferCapacity(); 164 assertTrue((n & (n - 1)) == 0); // power of two 165 assertNotNull(p.getExecutor()); 166 assertEquals(0, p.estimateMinimumDemand()); 167 assertEquals(0, p.estimateMaximumLag()); 168 } 169 170 /** 171 * A default-constructed SubmissionPublisher has no subscribers, 172 * is not closed, has default buffer size, and uses the 173 * defaultExecutor 174 */ 175 public void testConstructor1() { 176 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(); 177 checkInitialState(p); 178 assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize()); 179 Executor e = p.getExecutor(), c = ForkJoinPool.commonPool(); 180 if (ForkJoinPool.getCommonPoolParallelism() > 1) 181 assertSame(e, c); 182 else 183 assertNotSame(e, c); 184 } 185 186 /** 187 * A new SubmissionPublisher has no subscribers, is not closed, 188 * has the given buffer size, and uses the given executor 189 */ 190 public void testConstructor2() { 191 Executor e = Executors.newFixedThreadPool(1); 192 SubmissionPublisher<Integer> p = new SubmissionPublisher<>(e, 8); 193 checkInitialState(p); 194 assertSame(p.getExecutor(), e); 195 assertEquals(8, p.getMaxBufferCapacity()); 196 } 197 198 /** 199 * A null Executor argument to SubmissionPublisher constructor throws NPE 200 */ 201 public void testConstructor3() { 202 try { 203 new SubmissionPublisher<Integer>(null, 8); 204 shouldThrow(); 205 } catch (NullPointerException success) {} 206 } 207 208 /** 209 * A negative capacity argument to SubmissionPublisher constructor 210 * throws IAE 211 */ 212 public void testConstructor4() { 213 Executor e = Executors.newFixedThreadPool(1); 214 try { 215 new SubmissionPublisher<Integer>(e, -1); 216 shouldThrow(); 217 } catch (IllegalArgumentException success) {} 218 } 219 220 /** 221 * A closed publisher reports isClosed with no closedException and 222 * throws ISE upon attempted submission; a subsequent close or 223 * closeExceptionally has no additional effect. 224 */ 225 public void testClose() { 226 SubmissionPublisher<Integer> p = basicPublisher(); 227 checkInitialState(p); 228 p.close(); 229 assertTrue(p.isClosed()); 230 assertNull(p.getClosedException()); 231 try { 232 p.submit(1); 233 shouldThrow(); 234 } catch (IllegalStateException success) {} 235 Throwable ex = new SPException(); 236 p.closeExceptionally(ex); 237 assertTrue(p.isClosed()); 238 assertNull(p.getClosedException()); 239 } 240 241 /** 242 * A publisher closedExceptionally reports isClosed with the 243 * closedException and throws ISE upon attempted submission; a 244 * subsequent close or closeExceptionally has no additional 245 * effect. 246 */ 247 public void testCloseExceptionally() { 248 SubmissionPublisher<Integer> p = basicPublisher(); 249 checkInitialState(p); 250 Throwable ex = new SPException(); 251 p.closeExceptionally(ex); 252 assertTrue(p.isClosed()); 253 assertSame(p.getClosedException(), ex); 254 try { 255 p.submit(1); 256 shouldThrow(); 257 } catch (IllegalStateException success) {} 258 p.close(); 259 assertTrue(p.isClosed()); 260 assertSame(p.getClosedException(), ex); 261 } 262 263 /** 264 * Upon subscription, the subscriber's onSubscribe is called, no 265 * other Subscriber methods are invoked, the publisher 266 * hasSubscribers, isSubscribed is true, and existing 267 * subscriptions are unaffected. 268 */ 269 public void testSubscribe1() { 270 TestSubscriber s = new TestSubscriber(); 271 SubmissionPublisher<Integer> p = basicPublisher(); 272 p.subscribe(s); 273 assertTrue(p.hasSubscribers()); 274 assertEquals(1, p.getNumberOfSubscribers()); 275 assertTrue(p.getSubscribers().contains(s)); 276 assertTrue(p.isSubscribed(s)); 277 s.awaitSubscribe(); 278 assertNotNull(s.sn); 279 assertEquals(0, s.nexts); 280 assertEquals(0, s.errors); 281 assertEquals(0, s.completes); 282 TestSubscriber s2 = new TestSubscriber(); 283 p.subscribe(s2); 284 assertTrue(p.hasSubscribers()); 285 assertEquals(2, p.getNumberOfSubscribers()); 286 assertTrue(p.getSubscribers().contains(s)); 287 assertTrue(p.getSubscribers().contains(s2)); 288 assertTrue(p.isSubscribed(s)); 289 assertTrue(p.isSubscribed(s2)); 290 s2.awaitSubscribe(); 291 assertNotNull(s2.sn); 292 assertEquals(0, s2.nexts); 293 assertEquals(0, s2.errors); 294 assertEquals(0, s2.completes); 295 p.close(); 296 } 297 298 /** 299 * If closed, upon subscription, the subscriber's onComplete 300 * method is invoked 301 */ 302 public void testSubscribe2() { 303 TestSubscriber s = new TestSubscriber(); 304 SubmissionPublisher<Integer> p = basicPublisher(); 305 p.close(); 306 p.subscribe(s); 307 s.awaitComplete(); 308 assertEquals(0, s.nexts); 309 assertEquals(0, s.errors); 310 assertEquals(1, s.completes, 1); 311 } 312 313 /** 314 * If closedExceptionally, upon subscription, the subscriber's 315 * onError method is invoked 316 */ 317 public void testSubscribe3() { 318 TestSubscriber s = new TestSubscriber(); 319 SubmissionPublisher<Integer> p = basicPublisher(); 320 Throwable ex = new SPException(); 321 p.closeExceptionally(ex); 322 assertTrue(p.isClosed()); 323 assertSame(p.getClosedException(), ex); 324 p.subscribe(s); 325 s.awaitError(); 326 assertEquals(0, s.nexts); 327 assertEquals(1, s.errors); 328 } 329 330 /** 331 * Upon attempted resubscription, the subscriber's onError is 332 * called and the subscription is cancelled. 333 */ 334 public void testSubscribe4() { 335 TestSubscriber s = new TestSubscriber(); 336 SubmissionPublisher<Integer> p = basicPublisher(); 337 p.subscribe(s); 338 assertTrue(p.hasSubscribers()); 339 assertEquals(1, p.getNumberOfSubscribers()); 340 assertTrue(p.getSubscribers().contains(s)); 341 assertTrue(p.isSubscribed(s)); 342 s.awaitSubscribe(); 343 assertNotNull(s.sn); 344 assertEquals(0, s.nexts); 345 assertEquals(0, s.errors); 346 assertEquals(0, s.completes); 347 p.subscribe(s); 348 s.awaitError(); 349 assertEquals(0, s.nexts); 350 assertEquals(1, s.errors); 351 assertFalse(p.isSubscribed(s)); 352 } 353 354 /** 355 * An exception thrown in onSubscribe causes onError 356 */ 357 public void testSubscribe5() { 358 TestSubscriber s = new TestSubscriber(); 359 SubmissionPublisher<Integer> p = basicPublisher(); 360 s.throwOnCall = true; 361 try { 362 p.subscribe(s); 363 } catch (Exception ok) {} 364 s.awaitError(); 365 assertEquals(0, s.nexts); 366 assertEquals(1, s.errors); 367 assertEquals(0, s.completes); 368 } 369 370 /** 371 * subscribe(null) throws NPE 372 */ 373 public void testSubscribe6() { 374 SubmissionPublisher<Integer> p = basicPublisher(); 375 try { 376 p.subscribe(null); 377 shouldThrow(); 378 } catch (NullPointerException success) {} 379 checkInitialState(p); 380 } 381 382 /** 383 * Closing a publisher causes onComplete to subscribers 384 */ 385 public void testCloseCompletes() { 386 SubmissionPublisher<Integer> p = basicPublisher(); 387 TestSubscriber s1 = new TestSubscriber(); 388 TestSubscriber s2 = new TestSubscriber(); 389 p.subscribe(s1); 390 p.subscribe(s2); 391 p.submit(1); 392 p.close(); 393 assertTrue(p.isClosed()); 394 assertNull(p.getClosedException()); 395 s1.awaitComplete(); 396 assertEquals(1, s1.nexts); 397 assertEquals(1, s1.completes); 398 s2.awaitComplete(); 399 assertEquals(1, s2.nexts); 400 assertEquals(1, s2.completes); 401 } 402 403 /** 404 * Closing a publisher exceptionally causes onError to subscribers 405 * after they are subscribed 406 */ 407 public void testCloseExceptionallyError() { 408 SubmissionPublisher<Integer> p = basicPublisher(); 409 TestSubscriber s1 = new TestSubscriber(); 410 TestSubscriber s2 = new TestSubscriber(); 411 p.subscribe(s1); 412 p.subscribe(s2); 413 p.submit(1); 414 p.closeExceptionally(new SPException()); 415 assertTrue(p.isClosed()); 416 s1.awaitSubscribe(); 417 s1.awaitError(); 418 assertTrue(s1.nexts <= 1); 419 assertEquals(1, s1.errors); 420 s2.awaitSubscribe(); 421 s2.awaitError(); 422 assertTrue(s2.nexts <= 1); 423 assertEquals(1, s2.errors); 424 } 425 426 /** 427 * Cancelling a subscription eventually causes no more onNexts to be issued 428 */ 429 public void testCancel() { 430 SubmissionPublisher<Integer> p = basicPublisher(); 431 TestSubscriber s1 = new TestSubscriber(); 432 TestSubscriber s2 = new TestSubscriber(); 433 p.subscribe(s1); 434 p.subscribe(s2); 435 s1.awaitSubscribe(); 436 p.submit(1); 437 s1.sn.cancel(); 438 for (int i = 2; i <= 20; ++i) 439 p.submit(i); 440 p.close(); 441 s2.awaitComplete(); 442 assertEquals(20, s2.nexts); 443 assertEquals(1, s2.completes); 444 assertTrue(s1.nexts < 20); 445 assertFalse(p.isSubscribed(s1)); 446 } 447 448 /** 449 * Throwing an exception in onNext causes onError 450 */ 451 public void testThrowOnNext() { 452 SubmissionPublisher<Integer> p = basicPublisher(); 453 TestSubscriber s1 = new TestSubscriber(); 454 TestSubscriber s2 = new TestSubscriber(); 455 p.subscribe(s1); 456 p.subscribe(s2); 457 s1.awaitSubscribe(); 458 p.submit(1); 459 s1.throwOnCall = true; 460 p.submit(2); 461 p.close(); 462 s2.awaitComplete(); 463 assertEquals(2, s2.nexts); 464 s1.awaitComplete(); 465 assertEquals(1, s1.errors); 466 } 467 468 /** 469 * If a handler is supplied in constructor, it is invoked when 470 * subscriber throws an exception in onNext 471 */ 472 public void testThrowOnNextHandler() { 473 AtomicInteger calls = new AtomicInteger(); 474 SubmissionPublisher<Integer> p = new SubmissionPublisher<>( 475 basicExecutor, 8, (s, e) -> calls.getAndIncrement()); 476 TestSubscriber s1 = new TestSubscriber(); 477 TestSubscriber s2 = new TestSubscriber(); 478 p.subscribe(s1); 479 p.subscribe(s2); 480 s1.awaitSubscribe(); 481 p.submit(1); 482 s1.throwOnCall = true; 483 p.submit(2); 484 p.close(); 485 s2.awaitComplete(); 486 assertEquals(2, s2.nexts); 487 assertEquals(1, s2.completes); 488 s1.awaitError(); 489 assertEquals(1, s1.errors); 490 assertEquals(1, calls.get()); 491 } 492 493 /** 494 * onNext items are issued in the same order to each subscriber 495 */ 496 public void testOrder() { 497 SubmissionPublisher<Integer> p = basicPublisher(); 498 TestSubscriber s1 = new TestSubscriber(); 499 TestSubscriber s2 = new TestSubscriber(); 500 p.subscribe(s1); 501 p.subscribe(s2); 502 for (int i = 1; i <= 20; ++i) 503 p.submit(i); 504 p.close(); 505 s2.awaitComplete(); 506 s1.awaitComplete(); 507 assertEquals(20, s2.nexts); 508 assertEquals(1, s2.completes); 509 assertEquals(20, s1.nexts); 510 assertEquals(1, s1.completes); 511 } 512 513 /** 514 * onNext is issued only if requested 515 */ 516 public void testRequest1() { 517 SubmissionPublisher<Integer> p = basicPublisher(); 518 TestSubscriber s1 = new TestSubscriber(); 519 s1.request = false; 520 p.subscribe(s1); 521 s1.awaitSubscribe(); 522 assertTrue(p.estimateMinimumDemand() == 0); 523 TestSubscriber s2 = new TestSubscriber(); 524 p.subscribe(s2); 525 p.submit(1); 526 p.submit(2); 527 s2.awaitNext(1); 528 assertEquals(0, s1.nexts); 529 s1.sn.request(3); 530 p.submit(3); 531 p.close(); 532 s2.awaitComplete(); 533 assertEquals(3, s2.nexts); 534 assertEquals(1, s2.completes); 535 s1.awaitComplete(); 536 assertTrue(s1.nexts > 0); 537 assertEquals(1, s1.completes); 538 } 539 540 /** 541 * onNext is not issued when requests become zero 542 */ 543 public void testRequest2() { 544 SubmissionPublisher<Integer> p = basicPublisher(); 545 TestSubscriber s1 = new TestSubscriber(); 546 TestSubscriber s2 = new TestSubscriber(); 547 p.subscribe(s1); 548 p.subscribe(s2); 549 s2.awaitSubscribe(); 550 s1.awaitSubscribe(); 551 s1.request = false; 552 p.submit(1); 553 p.submit(2); 554 p.close(); 555 s2.awaitComplete(); 556 assertEquals(2, s2.nexts); 557 assertEquals(1, s2.completes); 558 s1.awaitNext(1); 559 assertEquals(1, s1.nexts); 560 } 561 562 /** 563 * Non-positive request causes error 564 */ 565 public void testRequest3() { 566 SubmissionPublisher<Integer> p = basicPublisher(); 567 TestSubscriber s1 = new TestSubscriber(); 568 TestSubscriber s2 = new TestSubscriber(); 569 TestSubscriber s3 = new TestSubscriber(); 570 p.subscribe(s1); 571 p.subscribe(s2); 572 p.subscribe(s3); 573 s3.awaitSubscribe(); 574 s2.awaitSubscribe(); 575 s1.awaitSubscribe(); 576 s1.sn.request(-1L); 577 s3.sn.request(0L); 578 p.submit(1); 579 p.submit(2); 580 p.close(); 581 s2.awaitComplete(); 582 assertEquals(2, s2.nexts); 583 assertEquals(1, s2.completes); 584 s1.awaitError(); 585 assertEquals(1, s1.errors); 586 assertTrue(s1.lastError instanceof IllegalArgumentException); 587 s3.awaitError(); 588 assertEquals(1, s3.errors); 589 assertTrue(s3.lastError instanceof IllegalArgumentException); 590 } 591 592 /** 593 * estimateMinimumDemand reports 0 until request, nonzero after 594 * request, and zero again after delivery 595 */ 596 public void testEstimateMinimumDemand() { 597 TestSubscriber s = new TestSubscriber(); 598 SubmissionPublisher<Integer> p = basicPublisher(); 599 s.request = false; 600 p.subscribe(s); 601 s.awaitSubscribe(); 602 assertEquals(0, p.estimateMinimumDemand()); 603 s.sn.request(1); 604 assertEquals(1, p.estimateMinimumDemand()); 605 p.submit(1); 606 s.awaitNext(1); 607 assertEquals(0, p.estimateMinimumDemand()); 608 } 609 610 /** 611 * submit to a publisher with no subscribers returns lag 0 612 */ 613 public void testEmptySubmit() { 614 SubmissionPublisher<Integer> p = basicPublisher(); 615 assertEquals(0, p.submit(1)); 616 } 617 618 /** 619 * submit(null) throws NPE 620 */ 621 public void testNullSubmit() { 622 SubmissionPublisher<Integer> p = basicPublisher(); 623 try { 624 p.submit(null); 625 shouldThrow(); 626 } catch (NullPointerException success) {} 627 } 628 629 /** 630 * submit returns number of lagged items, compatible with result 631 * of estimateMaximumLag. 632 */ 633 public void testLaggedSubmit() { 634 SubmissionPublisher<Integer> p = basicPublisher(); 635 TestSubscriber s1 = new TestSubscriber(); 636 s1.request = false; 637 TestSubscriber s2 = new TestSubscriber(); 638 s2.request = false; 639 p.subscribe(s1); 640 p.subscribe(s2); 641 s2.awaitSubscribe(); 642 s1.awaitSubscribe(); 643 assertEquals(1, p.submit(1)); 644 assertTrue(p.estimateMaximumLag() >= 1); 645 assertTrue(p.submit(2) >= 2); 646 assertTrue(p.estimateMaximumLag() >= 2); 647 s1.sn.request(4); 648 assertTrue(p.submit(3) >= 3); 649 assertTrue(p.estimateMaximumLag() >= 3); 650 s2.sn.request(4); 651 p.submit(4); 652 p.close(); 653 s2.awaitComplete(); 654 assertEquals(4, s2.nexts); 655 s1.awaitComplete(); 656 assertEquals(4, s2.nexts); 657 } 658 659 /** 660 * submit eventually issues requested items when buffer capacity is 1 661 */ 662 public void testCap1Submit() { 663 SubmissionPublisher<Integer> p 664 = new SubmissionPublisher<>(basicExecutor, 1); 665 TestSubscriber s1 = new TestSubscriber(); 666 TestSubscriber s2 = new TestSubscriber(); 667 p.subscribe(s1); 668 p.subscribe(s2); 669 for (int i = 1; i <= 20; ++i) { 670 assertTrue(p.estimateMinimumDemand() <= 1); 671 assertTrue(p.submit(i) >= 0); 672 } 673 p.close(); 674 s2.awaitComplete(); 675 s1.awaitComplete(); 676 assertEquals(20, s2.nexts); 677 assertEquals(1, s2.completes); 678 assertEquals(20, s1.nexts); 679 assertEquals(1, s1.completes); 680 } 681 682 static boolean noopHandle(AtomicInteger count) { 683 count.getAndIncrement(); 684 return false; 685 } 686 687 static boolean reqHandle(AtomicInteger count, Subscriber s) { 688 count.getAndIncrement(); 689 ((TestSubscriber)s).sn.request(Long.MAX_VALUE); 690 return true; 691 } 692 693 /** 694 * offer to a publisher with no subscribers returns lag 0 695 */ 696 public void testEmptyOffer() { 697 SubmissionPublisher<Integer> p = basicPublisher(); 698 assertEquals(0, p.offer(1, null)); 699 } 700 701 /** 702 * offer(null) throws NPE 703 */ 704 public void testNullOffer() { 705 SubmissionPublisher<Integer> p = basicPublisher(); 706 try { 707 p.offer(null, null); 708 shouldThrow(); 709 } catch (NullPointerException success) {} 710 } 711 712 /** 713 * offer returns number of lagged items if not saturated 714 */ 715 public void testLaggedOffer() { 716 SubmissionPublisher<Integer> p = basicPublisher(); 717 TestSubscriber s1 = new TestSubscriber(); 718 s1.request = false; 719 TestSubscriber s2 = new TestSubscriber(); 720 s2.request = false; 721 p.subscribe(s1); 722 p.subscribe(s2); 723 s2.awaitSubscribe(); 724 s1.awaitSubscribe(); 725 assertTrue(p.offer(1, null) >= 1); 726 assertTrue(p.offer(2, null) >= 2); 727 s1.sn.request(4); 728 assertTrue(p.offer(3, null) >= 3); 729 s2.sn.request(4); 730 p.offer(4, null); 731 p.close(); 732 s2.awaitComplete(); 733 assertEquals(4, s2.nexts); 734 s1.awaitComplete(); 735 assertEquals(4, s2.nexts); 736 } 737 738 /** 739 * offer reports drops if saturated 740 */ 741 public void testDroppedOffer() { 742 SubmissionPublisher<Integer> p 743 = new SubmissionPublisher<>(basicExecutor, 4); 744 TestSubscriber s1 = new TestSubscriber(); 745 s1.request = false; 746 TestSubscriber s2 = new TestSubscriber(); 747 s2.request = false; 748 p.subscribe(s1); 749 p.subscribe(s2); 750 s2.awaitSubscribe(); 751 s1.awaitSubscribe(); 752 for (int i = 1; i <= 4; ++i) 753 assertTrue(p.offer(i, null) >= 0); 754 p.offer(5, null); 755 assertTrue(p.offer(6, null) < 0); 756 s1.sn.request(64); 757 assertTrue(p.offer(7, null) < 0); 758 s2.sn.request(64); 759 p.close(); 760 s2.awaitComplete(); 761 assertTrue(s2.nexts >= 4); 762 s1.awaitComplete(); 763 assertTrue(s1.nexts >= 4); 764 } 765 766 /** 767 * offer invokes drop handler if saturated 768 */ 769 public void testHandledDroppedOffer() { 770 AtomicInteger calls = new AtomicInteger(); 771 SubmissionPublisher<Integer> p 772 = new SubmissionPublisher<>(basicExecutor, 4); 773 TestSubscriber s1 = new TestSubscriber(); 774 s1.request = false; 775 TestSubscriber s2 = new TestSubscriber(); 776 s2.request = false; 777 p.subscribe(s1); 778 p.subscribe(s2); 779 s2.awaitSubscribe(); 780 s1.awaitSubscribe(); 781 for (int i = 1; i <= 4; ++i) 782 assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0); 783 p.offer(4, (s, x) -> noopHandle(calls)); 784 assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0); 785 s1.sn.request(64); 786 assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0); 787 s2.sn.request(64); 788 p.close(); 789 s2.awaitComplete(); 790 s1.awaitComplete(); 791 assertTrue(calls.get() >= 4); 792 } 793 794 /** 795 * offer succeeds if drop handler forces request 796 */ 797 public void testRecoveredHandledDroppedOffer() { 798 AtomicInteger calls = new AtomicInteger(); 799 SubmissionPublisher<Integer> p 800 = new SubmissionPublisher<>(basicExecutor, 4); 801 TestSubscriber s1 = new TestSubscriber(); 802 s1.request = false; 803 TestSubscriber s2 = new TestSubscriber(); 804 s2.request = false; 805 p.subscribe(s1); 806 p.subscribe(s2); 807 s2.awaitSubscribe(); 808 s1.awaitSubscribe(); 809 int n = 0; 810 for (int i = 1; i <= 8; ++i) { 811 int d = p.offer(i, (s, x) -> reqHandle(calls, s)); 812 n = n + 2 + (d < 0 ? d : 0); 813 } 814 p.close(); 815 s2.awaitComplete(); 816 s1.awaitComplete(); 817 assertEquals(n, s1.nexts + s2.nexts); 818 assertTrue(calls.get() >= 2); 819 } 820 821 /** 822 * Timed offer to a publisher with no subscribers returns lag 0 823 */ 824 public void testEmptyTimedOffer() { 825 SubmissionPublisher<Integer> p = basicPublisher(); 826 long startTime = System.nanoTime(); 827 assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null)); 828 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2); 829 } 830 831 /** 832 * Timed offer with null item or TimeUnit throws NPE 833 */ 834 public void testNullTimedOffer() { 835 SubmissionPublisher<Integer> p = basicPublisher(); 836 long startTime = System.nanoTime(); 837 try { 838 p.offer(null, LONG_DELAY_MS, MILLISECONDS, null); 839 shouldThrow(); 840 } catch (NullPointerException success) {} 841 try { 842 p.offer(1, LONG_DELAY_MS, null, null); 843 shouldThrow(); 844 } catch (NullPointerException success) {} 845 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2); 846 } 847 848 /** 849 * Timed offer returns number of lagged items if not saturated 850 */ 851 public void testLaggedTimedOffer() { 852 SubmissionPublisher<Integer> p = basicPublisher(); 853 TestSubscriber s1 = new TestSubscriber(); 854 s1.request = false; 855 TestSubscriber s2 = new TestSubscriber(); 856 s2.request = false; 857 p.subscribe(s1); 858 p.subscribe(s2); 859 s2.awaitSubscribe(); 860 s1.awaitSubscribe(); 861 long startTime = System.nanoTime(); 862 assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1); 863 assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2); 864 s1.sn.request(4); 865 assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3); 866 s2.sn.request(4); 867 p.offer(4, LONG_DELAY_MS, MILLISECONDS, null); 868 p.close(); 869 s2.awaitComplete(); 870 assertEquals(4, s2.nexts); 871 s1.awaitComplete(); 872 assertEquals(4, s2.nexts); 873 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2); 874 } 875 876 /** 877 * Timed offer reports drops if saturated 878 */ 879 public void testDroppedTimedOffer() { 880 SubmissionPublisher<Integer> p 881 = new SubmissionPublisher<>(basicExecutor, 4); 882 TestSubscriber s1 = new TestSubscriber(); 883 s1.request = false; 884 TestSubscriber s2 = new TestSubscriber(); 885 s2.request = false; 886 p.subscribe(s1); 887 p.subscribe(s2); 888 s2.awaitSubscribe(); 889 s1.awaitSubscribe(); 890 long delay = timeoutMillis(); 891 for (int i = 1; i <= 4; ++i) 892 assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0); 893 long startTime = System.nanoTime(); 894 assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0); 895 s1.sn.request(64); 896 assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0); 897 // 2 * delay should elapse but check only 1 * delay to allow timer slop 898 assertTrue(millisElapsedSince(startTime) >= delay); 899 s2.sn.request(64); 900 p.close(); 901 s2.awaitComplete(); 902 assertTrue(s2.nexts >= 2); 903 s1.awaitComplete(); 904 assertTrue(s1.nexts >= 2); 905 } 906 907 /** 908 * Timed offer invokes drop handler if saturated 909 */ 910 public void testHandledDroppedTimedOffer() { 911 AtomicInteger calls = new AtomicInteger(); 912 SubmissionPublisher<Integer> p 913 = new SubmissionPublisher<>(basicExecutor, 4); 914 TestSubscriber s1 = new TestSubscriber(); 915 s1.request = false; 916 TestSubscriber s2 = new TestSubscriber(); 917 s2.request = false; 918 p.subscribe(s1); 919 p.subscribe(s2); 920 s2.awaitSubscribe(); 921 s1.awaitSubscribe(); 922 long delay = timeoutMillis(); 923 for (int i = 1; i <= 4; ++i) 924 assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0); 925 long startTime = System.nanoTime(); 926 assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); 927 s1.sn.request(64); 928 assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0); 929 assertTrue(millisElapsedSince(startTime) >= delay); 930 s2.sn.request(64); 931 p.close(); 932 s2.awaitComplete(); 933 s1.awaitComplete(); 934 assertTrue(calls.get() >= 2); 935 } 936 937 /** 938 * Timed offer succeeds if drop handler forces request 939 */ 940 public void testRecoveredHandledDroppedTimedOffer() { 941 AtomicInteger calls = new AtomicInteger(); 942 SubmissionPublisher<Integer> p 943 = new SubmissionPublisher<>(basicExecutor, 4); 944 TestSubscriber s1 = new TestSubscriber(); 945 s1.request = false; 946 TestSubscriber s2 = new TestSubscriber(); 947 s2.request = false; 948 p.subscribe(s1); 949 p.subscribe(s2); 950 s2.awaitSubscribe(); 951 s1.awaitSubscribe(); 952 int n = 0; 953 long delay = timeoutMillis(); 954 long startTime = System.nanoTime(); 955 for (int i = 1; i <= 6; ++i) { 956 int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s)); 957 n = n + 2 + (d < 0 ? d : 0); 958 } 959 assertTrue(millisElapsedSince(startTime) >= delay); 960 p.close(); 961 s2.awaitComplete(); 962 s1.awaitComplete(); 963 assertEquals(n, s1.nexts + s2.nexts); 964 assertTrue(calls.get() >= 2); 965 } 966 967 /** 968 * consume returns a CompletableFuture that is done when 969 * publisher completes 970 */ 971 public void testConsume() { 972 AtomicInteger sum = new AtomicInteger(); 973 SubmissionPublisher<Integer> p = basicPublisher(); 974 CompletableFuture<Void> f = 975 p.consume((Integer x) -> sum.getAndAdd(x.intValue())); 976 int n = 20; 977 for (int i = 1; i <= n; ++i) 978 p.submit(i); 979 p.close(); 980 f.join(); 981 assertEquals((n * (n + 1)) / 2, sum.get()); 982 } 983 984 /** 985 * consume(null) throws NPE 986 */ 987 public void testConsumeNPE() { 988 SubmissionPublisher<Integer> p = basicPublisher(); 989 try { 990 CompletableFuture<Void> f = p.consume(null); 991 shouldThrow(); 992 } catch (NullPointerException success) {} 993 } 994 995 /** 996 * consume eventually stops processing published items if cancelled 997 */ 998 public void testCancelledConsume() { 999 AtomicInteger count = new AtomicInteger(); 1000 SubmissionPublisher<Integer> p = basicPublisher(); 1001 CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement()); 1002 f.cancel(true); 1003 int n = 1000000; // arbitrary limit 1004 for (int i = 1; i <= n; ++i) 1005 p.submit(i); 1006 assertTrue(count.get() < n); 1007 } 1008 1009} 1010