1/*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This code is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11 * version 2 for more details (a copy is included in the LICENSE file that
12 * accompanied this code).
13 *
14 * You should have received a copy of the GNU General Public License version
15 * 2 along with this work; if not, write to the Free Software Foundation,
16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
17 *
18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
19 * or visit www.oracle.com if you need additional information or have any
20 * questions.
21 */
22
23/*
24 * This file is available under and governed by the GNU General Public
25 * License version 2 only, as published by the Free Software Foundation.
26 * However, the following notice accompanied the original version of this
27 * file:
28 *
29 * Written by Doug Lea and Martin Buchholz with assistance from
30 * members of JCP JSR-166 Expert Group and released to the public
31 * domain, as explained at
32 * http://creativecommons.org/publicdomain/zero/1.0/
33 */
34
35import java.util.concurrent.CompletableFuture;
36import java.util.concurrent.Executor;
37import java.util.concurrent.Executors;
38import java.util.concurrent.Flow;
39import java.util.concurrent.ForkJoinPool;
40import java.util.concurrent.SubmissionPublisher;
41import java.util.concurrent.atomic.AtomicInteger;
42import junit.framework.Test;
43import junit.framework.TestSuite;
44
45import static java.util.concurrent.Flow.Subscriber;
46import static java.util.concurrent.Flow.Subscription;
47import static java.util.concurrent.TimeUnit.MILLISECONDS;
48
49public class SubmissionPublisherTest extends JSR166TestCase {
50
51    public static void main(String[] args) {
52        main(suite(), args);
53    }
54    public static Test suite() {
55        return new TestSuite(SubmissionPublisherTest.class);
56    }
57
58    final Executor basicExecutor = basicPublisher().getExecutor();
59
60    static SubmissionPublisher<Integer> basicPublisher() {
61        return new SubmissionPublisher<Integer>();
62    }
63
64    static class SPException extends RuntimeException {}
65
66    class TestSubscriber implements Subscriber<Integer> {
67        volatile Subscription sn;
68        int last;  // Requires that onNexts are in numeric order
69        volatile int nexts;
70        volatile int errors;
71        volatile int completes;
72        volatile boolean throwOnCall = false;
73        volatile boolean request = true;
74        volatile Throwable lastError;
75
76        public synchronized void onSubscribe(Subscription s) {
77            threadAssertTrue(sn == null);
78            sn = s;
79            notifyAll();
80            if (throwOnCall)
81                throw new SPException();
82            if (request)
83                sn.request(1L);
84        }
85        public synchronized void onNext(Integer t) {
86            ++nexts;
87            notifyAll();
88            int current = t.intValue();
89            threadAssertTrue(current >= last);
90            last = current;
91            if (request)
92                sn.request(1L);
93            if (throwOnCall)
94                throw new SPException();
95        }
96        public synchronized void onError(Throwable t) {
97            threadAssertTrue(completes == 0);
98            threadAssertTrue(errors == 0);
99            lastError = t;
100            ++errors;
101            notifyAll();
102        }
103        public synchronized void onComplete() {
104            threadAssertTrue(completes == 0);
105            ++completes;
106            notifyAll();
107        }
108
109        synchronized void awaitSubscribe() {
110            while (sn == null) {
111                try {
112                    wait();
113                } catch (Exception ex) {
114                    threadUnexpectedException(ex);
115                    break;
116                }
117            }
118        }
119        synchronized void awaitNext(int n) {
120            while (nexts < n) {
121                try {
122                    wait();
123                } catch (Exception ex) {
124                    threadUnexpectedException(ex);
125                    break;
126                }
127            }
128        }
129        synchronized void awaitComplete() {
130            while (completes == 0 && errors == 0) {
131                try {
132                    wait();
133                } catch (Exception ex) {
134                    threadUnexpectedException(ex);
135                    break;
136                }
137            }
138        }
139        synchronized void awaitError() {
140            while (errors == 0) {
141                try {
142                    wait();
143                } catch (Exception ex) {
144                    threadUnexpectedException(ex);
145                    break;
146                }
147            }
148        }
149
150    }
151
152    /**
153     * A new SubmissionPublisher has no subscribers, a non-null
154     * executor, a power-of-two capacity, is not closed, and reports
155     * zero demand and lag
156     */
157    void checkInitialState(SubmissionPublisher<?> p) {
158        assertFalse(p.hasSubscribers());
159        assertEquals(0, p.getNumberOfSubscribers());
160        assertTrue(p.getSubscribers().isEmpty());
161        assertFalse(p.isClosed());
162        assertNull(p.getClosedException());
163        int n = p.getMaxBufferCapacity();
164        assertTrue((n & (n - 1)) == 0); // power of two
165        assertNotNull(p.getExecutor());
166        assertEquals(0, p.estimateMinimumDemand());
167        assertEquals(0, p.estimateMaximumLag());
168    }
169
170    /**
171     * A default-constructed SubmissionPublisher has no subscribers,
172     * is not closed, has default buffer size, and uses the
173     * defaultExecutor
174     */
175    public void testConstructor1() {
176        SubmissionPublisher<Integer> p = new SubmissionPublisher<>();
177        checkInitialState(p);
178        assertEquals(p.getMaxBufferCapacity(), Flow.defaultBufferSize());
179        Executor e = p.getExecutor(), c = ForkJoinPool.commonPool();
180        if (ForkJoinPool.getCommonPoolParallelism() > 1)
181            assertSame(e, c);
182        else
183            assertNotSame(e, c);
184    }
185
186    /**
187     * A new SubmissionPublisher has no subscribers, is not closed,
188     * has the given buffer size, and uses the given executor
189     */
190    public void testConstructor2() {
191        Executor e = Executors.newFixedThreadPool(1);
192        SubmissionPublisher<Integer> p = new SubmissionPublisher<>(e, 8);
193        checkInitialState(p);
194        assertSame(p.getExecutor(), e);
195        assertEquals(8, p.getMaxBufferCapacity());
196    }
197
198    /**
199     * A null Executor argument to SubmissionPublisher constructor throws NPE
200     */
201    public void testConstructor3() {
202        try {
203            new SubmissionPublisher<Integer>(null, 8);
204            shouldThrow();
205        } catch (NullPointerException success) {}
206    }
207
208    /**
209     * A negative capacity argument to SubmissionPublisher constructor
210     * throws IAE
211     */
212    public void testConstructor4() {
213        Executor e = Executors.newFixedThreadPool(1);
214        try {
215            new SubmissionPublisher<Integer>(e, -1);
216            shouldThrow();
217        } catch (IllegalArgumentException success) {}
218    }
219
220    /**
221     * A closed publisher reports isClosed with no closedException and
222     * throws ISE upon attempted submission; a subsequent close or
223     * closeExceptionally has no additional effect.
224     */
225    public void testClose() {
226        SubmissionPublisher<Integer> p = basicPublisher();
227        checkInitialState(p);
228        p.close();
229        assertTrue(p.isClosed());
230        assertNull(p.getClosedException());
231        try {
232            p.submit(1);
233            shouldThrow();
234        } catch (IllegalStateException success) {}
235        Throwable ex = new SPException();
236        p.closeExceptionally(ex);
237        assertTrue(p.isClosed());
238        assertNull(p.getClosedException());
239    }
240
241    /**
242     * A publisher closedExceptionally reports isClosed with the
243     * closedException and throws ISE upon attempted submission; a
244     * subsequent close or closeExceptionally has no additional
245     * effect.
246     */
247    public void testCloseExceptionally() {
248        SubmissionPublisher<Integer> p = basicPublisher();
249        checkInitialState(p);
250        Throwable ex = new SPException();
251        p.closeExceptionally(ex);
252        assertTrue(p.isClosed());
253        assertSame(p.getClosedException(), ex);
254        try {
255            p.submit(1);
256            shouldThrow();
257        } catch (IllegalStateException success) {}
258        p.close();
259        assertTrue(p.isClosed());
260        assertSame(p.getClosedException(), ex);
261    }
262
263    /**
264     * Upon subscription, the subscriber's onSubscribe is called, no
265     * other Subscriber methods are invoked, the publisher
266     * hasSubscribers, isSubscribed is true, and existing
267     * subscriptions are unaffected.
268     */
269    public void testSubscribe1() {
270        TestSubscriber s = new TestSubscriber();
271        SubmissionPublisher<Integer> p = basicPublisher();
272        p.subscribe(s);
273        assertTrue(p.hasSubscribers());
274        assertEquals(1, p.getNumberOfSubscribers());
275        assertTrue(p.getSubscribers().contains(s));
276        assertTrue(p.isSubscribed(s));
277        s.awaitSubscribe();
278        assertNotNull(s.sn);
279        assertEquals(0, s.nexts);
280        assertEquals(0, s.errors);
281        assertEquals(0, s.completes);
282        TestSubscriber s2 = new TestSubscriber();
283        p.subscribe(s2);
284        assertTrue(p.hasSubscribers());
285        assertEquals(2, p.getNumberOfSubscribers());
286        assertTrue(p.getSubscribers().contains(s));
287        assertTrue(p.getSubscribers().contains(s2));
288        assertTrue(p.isSubscribed(s));
289        assertTrue(p.isSubscribed(s2));
290        s2.awaitSubscribe();
291        assertNotNull(s2.sn);
292        assertEquals(0, s2.nexts);
293        assertEquals(0, s2.errors);
294        assertEquals(0, s2.completes);
295        p.close();
296    }
297
298    /**
299     * If closed, upon subscription, the subscriber's onComplete
300     * method is invoked
301     */
302    public void testSubscribe2() {
303        TestSubscriber s = new TestSubscriber();
304        SubmissionPublisher<Integer> p = basicPublisher();
305        p.close();
306        p.subscribe(s);
307        s.awaitComplete();
308        assertEquals(0, s.nexts);
309        assertEquals(0, s.errors);
310        assertEquals(1, s.completes, 1);
311    }
312
313    /**
314     * If closedExceptionally, upon subscription, the subscriber's
315     * onError method is invoked
316     */
317    public void testSubscribe3() {
318        TestSubscriber s = new TestSubscriber();
319        SubmissionPublisher<Integer> p = basicPublisher();
320        Throwable ex = new SPException();
321        p.closeExceptionally(ex);
322        assertTrue(p.isClosed());
323        assertSame(p.getClosedException(), ex);
324        p.subscribe(s);
325        s.awaitError();
326        assertEquals(0, s.nexts);
327        assertEquals(1, s.errors);
328    }
329
330    /**
331     * Upon attempted resubscription, the subscriber's onError is
332     * called and the subscription is cancelled.
333     */
334    public void testSubscribe4() {
335        TestSubscriber s = new TestSubscriber();
336        SubmissionPublisher<Integer> p = basicPublisher();
337        p.subscribe(s);
338        assertTrue(p.hasSubscribers());
339        assertEquals(1, p.getNumberOfSubscribers());
340        assertTrue(p.getSubscribers().contains(s));
341        assertTrue(p.isSubscribed(s));
342        s.awaitSubscribe();
343        assertNotNull(s.sn);
344        assertEquals(0, s.nexts);
345        assertEquals(0, s.errors);
346        assertEquals(0, s.completes);
347        p.subscribe(s);
348        s.awaitError();
349        assertEquals(0, s.nexts);
350        assertEquals(1, s.errors);
351        assertFalse(p.isSubscribed(s));
352    }
353
354    /**
355     * An exception thrown in onSubscribe causes onError
356     */
357    public void testSubscribe5() {
358        TestSubscriber s = new TestSubscriber();
359        SubmissionPublisher<Integer> p = basicPublisher();
360        s.throwOnCall = true;
361        try {
362            p.subscribe(s);
363        } catch (Exception ok) {}
364        s.awaitError();
365        assertEquals(0, s.nexts);
366        assertEquals(1, s.errors);
367        assertEquals(0, s.completes);
368    }
369
370    /**
371     * subscribe(null) throws NPE
372     */
373    public void testSubscribe6() {
374        SubmissionPublisher<Integer> p = basicPublisher();
375        try {
376            p.subscribe(null);
377            shouldThrow();
378        } catch (NullPointerException success) {}
379        checkInitialState(p);
380    }
381
382    /**
383     * Closing a publisher causes onComplete to subscribers
384     */
385    public void testCloseCompletes() {
386        SubmissionPublisher<Integer> p = basicPublisher();
387        TestSubscriber s1 = new TestSubscriber();
388        TestSubscriber s2 = new TestSubscriber();
389        p.subscribe(s1);
390        p.subscribe(s2);
391        p.submit(1);
392        p.close();
393        assertTrue(p.isClosed());
394        assertNull(p.getClosedException());
395        s1.awaitComplete();
396        assertEquals(1, s1.nexts);
397        assertEquals(1, s1.completes);
398        s2.awaitComplete();
399        assertEquals(1, s2.nexts);
400        assertEquals(1, s2.completes);
401    }
402
403    /**
404     * Closing a publisher exceptionally causes onError to subscribers
405     * after they are subscribed
406     */
407    public void testCloseExceptionallyError() {
408        SubmissionPublisher<Integer> p = basicPublisher();
409        TestSubscriber s1 = new TestSubscriber();
410        TestSubscriber s2 = new TestSubscriber();
411        p.subscribe(s1);
412        p.subscribe(s2);
413        p.submit(1);
414        p.closeExceptionally(new SPException());
415        assertTrue(p.isClosed());
416        s1.awaitSubscribe();
417        s1.awaitError();
418        assertTrue(s1.nexts <= 1);
419        assertEquals(1, s1.errors);
420        s2.awaitSubscribe();
421        s2.awaitError();
422        assertTrue(s2.nexts <= 1);
423        assertEquals(1, s2.errors);
424    }
425
426    /**
427     * Cancelling a subscription eventually causes no more onNexts to be issued
428     */
429    public void testCancel() {
430        SubmissionPublisher<Integer> p = basicPublisher();
431        TestSubscriber s1 = new TestSubscriber();
432        TestSubscriber s2 = new TestSubscriber();
433        p.subscribe(s1);
434        p.subscribe(s2);
435        s1.awaitSubscribe();
436        p.submit(1);
437        s1.sn.cancel();
438        for (int i = 2; i <= 20; ++i)
439            p.submit(i);
440        p.close();
441        s2.awaitComplete();
442        assertEquals(20, s2.nexts);
443        assertEquals(1, s2.completes);
444        assertTrue(s1.nexts < 20);
445        assertFalse(p.isSubscribed(s1));
446    }
447
448    /**
449     * Throwing an exception in onNext causes onError
450     */
451    public void testThrowOnNext() {
452        SubmissionPublisher<Integer> p = basicPublisher();
453        TestSubscriber s1 = new TestSubscriber();
454        TestSubscriber s2 = new TestSubscriber();
455        p.subscribe(s1);
456        p.subscribe(s2);
457        s1.awaitSubscribe();
458        p.submit(1);
459        s1.throwOnCall = true;
460        p.submit(2);
461        p.close();
462        s2.awaitComplete();
463        assertEquals(2, s2.nexts);
464        s1.awaitComplete();
465        assertEquals(1, s1.errors);
466    }
467
468    /**
469     * If a handler is supplied in constructor, it is invoked when
470     * subscriber throws an exception in onNext
471     */
472    public void testThrowOnNextHandler() {
473        AtomicInteger calls = new AtomicInteger();
474        SubmissionPublisher<Integer> p = new SubmissionPublisher<>(
475            basicExecutor, 8, (s, e) -> calls.getAndIncrement());
476        TestSubscriber s1 = new TestSubscriber();
477        TestSubscriber s2 = new TestSubscriber();
478        p.subscribe(s1);
479        p.subscribe(s2);
480        s1.awaitSubscribe();
481        p.submit(1);
482        s1.throwOnCall = true;
483        p.submit(2);
484        p.close();
485        s2.awaitComplete();
486        assertEquals(2, s2.nexts);
487        assertEquals(1, s2.completes);
488        s1.awaitError();
489        assertEquals(1, s1.errors);
490        assertEquals(1, calls.get());
491    }
492
493    /**
494     * onNext items are issued in the same order to each subscriber
495     */
496    public void testOrder() {
497        SubmissionPublisher<Integer> p = basicPublisher();
498        TestSubscriber s1 = new TestSubscriber();
499        TestSubscriber s2 = new TestSubscriber();
500        p.subscribe(s1);
501        p.subscribe(s2);
502        for (int i = 1; i <= 20; ++i)
503            p.submit(i);
504        p.close();
505        s2.awaitComplete();
506        s1.awaitComplete();
507        assertEquals(20, s2.nexts);
508        assertEquals(1, s2.completes);
509        assertEquals(20, s1.nexts);
510        assertEquals(1, s1.completes);
511    }
512
513    /**
514     * onNext is issued only if requested
515     */
516    public void testRequest1() {
517        SubmissionPublisher<Integer> p = basicPublisher();
518        TestSubscriber s1 = new TestSubscriber();
519        s1.request = false;
520        p.subscribe(s1);
521        s1.awaitSubscribe();
522        assertTrue(p.estimateMinimumDemand() == 0);
523        TestSubscriber s2 = new TestSubscriber();
524        p.subscribe(s2);
525        p.submit(1);
526        p.submit(2);
527        s2.awaitNext(1);
528        assertEquals(0, s1.nexts);
529        s1.sn.request(3);
530        p.submit(3);
531        p.close();
532        s2.awaitComplete();
533        assertEquals(3, s2.nexts);
534        assertEquals(1, s2.completes);
535        s1.awaitComplete();
536        assertTrue(s1.nexts > 0);
537        assertEquals(1, s1.completes);
538    }
539
540    /**
541     * onNext is not issued when requests become zero
542     */
543    public void testRequest2() {
544        SubmissionPublisher<Integer> p = basicPublisher();
545        TestSubscriber s1 = new TestSubscriber();
546        TestSubscriber s2 = new TestSubscriber();
547        p.subscribe(s1);
548        p.subscribe(s2);
549        s2.awaitSubscribe();
550        s1.awaitSubscribe();
551        s1.request = false;
552        p.submit(1);
553        p.submit(2);
554        p.close();
555        s2.awaitComplete();
556        assertEquals(2, s2.nexts);
557        assertEquals(1, s2.completes);
558        s1.awaitNext(1);
559        assertEquals(1, s1.nexts);
560    }
561
562    /**
563     * Non-positive request causes error
564     */
565    public void testRequest3() {
566        SubmissionPublisher<Integer> p = basicPublisher();
567        TestSubscriber s1 = new TestSubscriber();
568        TestSubscriber s2 = new TestSubscriber();
569        TestSubscriber s3 = new TestSubscriber();
570        p.subscribe(s1);
571        p.subscribe(s2);
572        p.subscribe(s3);
573        s3.awaitSubscribe();
574        s2.awaitSubscribe();
575        s1.awaitSubscribe();
576        s1.sn.request(-1L);
577        s3.sn.request(0L);
578        p.submit(1);
579        p.submit(2);
580        p.close();
581        s2.awaitComplete();
582        assertEquals(2, s2.nexts);
583        assertEquals(1, s2.completes);
584        s1.awaitError();
585        assertEquals(1, s1.errors);
586        assertTrue(s1.lastError instanceof IllegalArgumentException);
587        s3.awaitError();
588        assertEquals(1, s3.errors);
589        assertTrue(s3.lastError instanceof IllegalArgumentException);
590    }
591
592    /**
593     * estimateMinimumDemand reports 0 until request, nonzero after
594     * request, and zero again after delivery
595     */
596    public void testEstimateMinimumDemand() {
597        TestSubscriber s = new TestSubscriber();
598        SubmissionPublisher<Integer> p = basicPublisher();
599        s.request = false;
600        p.subscribe(s);
601        s.awaitSubscribe();
602        assertEquals(0, p.estimateMinimumDemand());
603        s.sn.request(1);
604        assertEquals(1, p.estimateMinimumDemand());
605        p.submit(1);
606        s.awaitNext(1);
607        assertEquals(0, p.estimateMinimumDemand());
608    }
609
610    /**
611     * submit to a publisher with no subscribers returns lag 0
612     */
613    public void testEmptySubmit() {
614        SubmissionPublisher<Integer> p = basicPublisher();
615        assertEquals(0, p.submit(1));
616    }
617
618    /**
619     * submit(null) throws NPE
620     */
621    public void testNullSubmit() {
622        SubmissionPublisher<Integer> p = basicPublisher();
623        try {
624            p.submit(null);
625            shouldThrow();
626        } catch (NullPointerException success) {}
627    }
628
629    /**
630     * submit returns number of lagged items, compatible with result
631     * of estimateMaximumLag.
632     */
633    public void testLaggedSubmit() {
634        SubmissionPublisher<Integer> p = basicPublisher();
635        TestSubscriber s1 = new TestSubscriber();
636        s1.request = false;
637        TestSubscriber s2 = new TestSubscriber();
638        s2.request = false;
639        p.subscribe(s1);
640        p.subscribe(s2);
641        s2.awaitSubscribe();
642        s1.awaitSubscribe();
643        assertEquals(1, p.submit(1));
644        assertTrue(p.estimateMaximumLag() >= 1);
645        assertTrue(p.submit(2) >= 2);
646        assertTrue(p.estimateMaximumLag() >= 2);
647        s1.sn.request(4);
648        assertTrue(p.submit(3) >= 3);
649        assertTrue(p.estimateMaximumLag() >= 3);
650        s2.sn.request(4);
651        p.submit(4);
652        p.close();
653        s2.awaitComplete();
654        assertEquals(4, s2.nexts);
655        s1.awaitComplete();
656        assertEquals(4, s2.nexts);
657    }
658
659    /**
660     * submit eventually issues requested items when buffer capacity is 1
661     */
662    public void testCap1Submit() {
663        SubmissionPublisher<Integer> p
664            = new SubmissionPublisher<>(basicExecutor, 1);
665        TestSubscriber s1 = new TestSubscriber();
666        TestSubscriber s2 = new TestSubscriber();
667        p.subscribe(s1);
668        p.subscribe(s2);
669        for (int i = 1; i <= 20; ++i) {
670            assertTrue(p.estimateMinimumDemand() <= 1);
671            assertTrue(p.submit(i) >= 0);
672        }
673        p.close();
674        s2.awaitComplete();
675        s1.awaitComplete();
676        assertEquals(20, s2.nexts);
677        assertEquals(1, s2.completes);
678        assertEquals(20, s1.nexts);
679        assertEquals(1, s1.completes);
680    }
681
682    static boolean noopHandle(AtomicInteger count) {
683        count.getAndIncrement();
684        return false;
685    }
686
687    static boolean reqHandle(AtomicInteger count, Subscriber s) {
688        count.getAndIncrement();
689        ((TestSubscriber)s).sn.request(Long.MAX_VALUE);
690        return true;
691    }
692
693    /**
694     * offer to a publisher with no subscribers returns lag 0
695     */
696    public void testEmptyOffer() {
697        SubmissionPublisher<Integer> p = basicPublisher();
698        assertEquals(0, p.offer(1, null));
699    }
700
701    /**
702     * offer(null) throws NPE
703     */
704    public void testNullOffer() {
705        SubmissionPublisher<Integer> p = basicPublisher();
706        try {
707            p.offer(null, null);
708            shouldThrow();
709        } catch (NullPointerException success) {}
710    }
711
712    /**
713     * offer returns number of lagged items if not saturated
714     */
715    public void testLaggedOffer() {
716        SubmissionPublisher<Integer> p = basicPublisher();
717        TestSubscriber s1 = new TestSubscriber();
718        s1.request = false;
719        TestSubscriber s2 = new TestSubscriber();
720        s2.request = false;
721        p.subscribe(s1);
722        p.subscribe(s2);
723        s2.awaitSubscribe();
724        s1.awaitSubscribe();
725        assertTrue(p.offer(1, null) >= 1);
726        assertTrue(p.offer(2, null) >= 2);
727        s1.sn.request(4);
728        assertTrue(p.offer(3, null) >= 3);
729        s2.sn.request(4);
730        p.offer(4, null);
731        p.close();
732        s2.awaitComplete();
733        assertEquals(4, s2.nexts);
734        s1.awaitComplete();
735        assertEquals(4, s2.nexts);
736    }
737
738    /**
739     * offer reports drops if saturated
740     */
741    public void testDroppedOffer() {
742        SubmissionPublisher<Integer> p
743            = new SubmissionPublisher<>(basicExecutor, 4);
744        TestSubscriber s1 = new TestSubscriber();
745        s1.request = false;
746        TestSubscriber s2 = new TestSubscriber();
747        s2.request = false;
748        p.subscribe(s1);
749        p.subscribe(s2);
750        s2.awaitSubscribe();
751        s1.awaitSubscribe();
752        for (int i = 1; i <= 4; ++i)
753            assertTrue(p.offer(i, null) >= 0);
754        p.offer(5, null);
755        assertTrue(p.offer(6, null) < 0);
756        s1.sn.request(64);
757        assertTrue(p.offer(7, null) < 0);
758        s2.sn.request(64);
759        p.close();
760        s2.awaitComplete();
761        assertTrue(s2.nexts >= 4);
762        s1.awaitComplete();
763        assertTrue(s1.nexts >= 4);
764    }
765
766    /**
767     * offer invokes drop handler if saturated
768     */
769    public void testHandledDroppedOffer() {
770        AtomicInteger calls = new AtomicInteger();
771        SubmissionPublisher<Integer> p
772            = new SubmissionPublisher<>(basicExecutor, 4);
773        TestSubscriber s1 = new TestSubscriber();
774        s1.request = false;
775        TestSubscriber s2 = new TestSubscriber();
776        s2.request = false;
777        p.subscribe(s1);
778        p.subscribe(s2);
779        s2.awaitSubscribe();
780        s1.awaitSubscribe();
781        for (int i = 1; i <= 4; ++i)
782            assertTrue(p.offer(i, (s, x) -> noopHandle(calls)) >= 0);
783        p.offer(4, (s, x) -> noopHandle(calls));
784        assertTrue(p.offer(6, (s, x) -> noopHandle(calls)) < 0);
785        s1.sn.request(64);
786        assertTrue(p.offer(7, (s, x) -> noopHandle(calls)) < 0);
787        s2.sn.request(64);
788        p.close();
789        s2.awaitComplete();
790        s1.awaitComplete();
791        assertTrue(calls.get() >= 4);
792    }
793
794    /**
795     * offer succeeds if drop handler forces request
796     */
797    public void testRecoveredHandledDroppedOffer() {
798        AtomicInteger calls = new AtomicInteger();
799        SubmissionPublisher<Integer> p
800            = new SubmissionPublisher<>(basicExecutor, 4);
801        TestSubscriber s1 = new TestSubscriber();
802        s1.request = false;
803        TestSubscriber s2 = new TestSubscriber();
804        s2.request = false;
805        p.subscribe(s1);
806        p.subscribe(s2);
807        s2.awaitSubscribe();
808        s1.awaitSubscribe();
809        int n = 0;
810        for (int i = 1; i <= 8; ++i) {
811            int d = p.offer(i, (s, x) -> reqHandle(calls, s));
812            n = n + 2 + (d < 0 ? d : 0);
813        }
814        p.close();
815        s2.awaitComplete();
816        s1.awaitComplete();
817        assertEquals(n, s1.nexts + s2.nexts);
818        assertTrue(calls.get() >= 2);
819    }
820
821    /**
822     * Timed offer to a publisher with no subscribers returns lag 0
823     */
824    public void testEmptyTimedOffer() {
825        SubmissionPublisher<Integer> p = basicPublisher();
826        long startTime = System.nanoTime();
827        assertEquals(0, p.offer(1, LONG_DELAY_MS, MILLISECONDS, null));
828        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
829    }
830
831    /**
832     * Timed offer with null item or TimeUnit throws NPE
833     */
834    public void testNullTimedOffer() {
835        SubmissionPublisher<Integer> p = basicPublisher();
836        long startTime = System.nanoTime();
837        try {
838            p.offer(null, LONG_DELAY_MS, MILLISECONDS, null);
839            shouldThrow();
840        } catch (NullPointerException success) {}
841        try {
842            p.offer(1, LONG_DELAY_MS, null, null);
843            shouldThrow();
844        } catch (NullPointerException success) {}
845        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
846    }
847
848    /**
849     * Timed offer returns number of lagged items if not saturated
850     */
851    public void testLaggedTimedOffer() {
852        SubmissionPublisher<Integer> p = basicPublisher();
853        TestSubscriber s1 = new TestSubscriber();
854        s1.request = false;
855        TestSubscriber s2 = new TestSubscriber();
856        s2.request = false;
857        p.subscribe(s1);
858        p.subscribe(s2);
859        s2.awaitSubscribe();
860        s1.awaitSubscribe();
861        long startTime = System.nanoTime();
862        assertTrue(p.offer(1, LONG_DELAY_MS, MILLISECONDS, null) >= 1);
863        assertTrue(p.offer(2, LONG_DELAY_MS, MILLISECONDS, null) >= 2);
864        s1.sn.request(4);
865        assertTrue(p.offer(3, LONG_DELAY_MS, MILLISECONDS, null) >= 3);
866        s2.sn.request(4);
867        p.offer(4, LONG_DELAY_MS, MILLISECONDS, null);
868        p.close();
869        s2.awaitComplete();
870        assertEquals(4, s2.nexts);
871        s1.awaitComplete();
872        assertEquals(4, s2.nexts);
873        assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS / 2);
874    }
875
876    /**
877     * Timed offer reports drops if saturated
878     */
879    public void testDroppedTimedOffer() {
880        SubmissionPublisher<Integer> p
881            = new SubmissionPublisher<>(basicExecutor, 4);
882        TestSubscriber s1 = new TestSubscriber();
883        s1.request = false;
884        TestSubscriber s2 = new TestSubscriber();
885        s2.request = false;
886        p.subscribe(s1);
887        p.subscribe(s2);
888        s2.awaitSubscribe();
889        s1.awaitSubscribe();
890        long delay = timeoutMillis();
891        for (int i = 1; i <= 4; ++i)
892            assertTrue(p.offer(i, delay, MILLISECONDS, null) >= 0);
893        long startTime = System.nanoTime();
894        assertTrue(p.offer(5, delay, MILLISECONDS, null) < 0);
895        s1.sn.request(64);
896        assertTrue(p.offer(6, delay, MILLISECONDS, null) < 0);
897        // 2 * delay should elapse but check only 1 * delay to allow timer slop
898        assertTrue(millisElapsedSince(startTime) >= delay);
899        s2.sn.request(64);
900        p.close();
901        s2.awaitComplete();
902        assertTrue(s2.nexts >= 2);
903        s1.awaitComplete();
904        assertTrue(s1.nexts >= 2);
905    }
906
907    /**
908     * Timed offer invokes drop handler if saturated
909     */
910    public void testHandledDroppedTimedOffer() {
911        AtomicInteger calls = new AtomicInteger();
912        SubmissionPublisher<Integer> p
913            = new SubmissionPublisher<>(basicExecutor, 4);
914        TestSubscriber s1 = new TestSubscriber();
915        s1.request = false;
916        TestSubscriber s2 = new TestSubscriber();
917        s2.request = false;
918        p.subscribe(s1);
919        p.subscribe(s2);
920        s2.awaitSubscribe();
921        s1.awaitSubscribe();
922        long delay = timeoutMillis();
923        for (int i = 1; i <= 4; ++i)
924            assertTrue(p.offer(i, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) >= 0);
925        long startTime = System.nanoTime();
926        assertTrue(p.offer(5, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
927        s1.sn.request(64);
928        assertTrue(p.offer(6, delay, MILLISECONDS, (s, x) -> noopHandle(calls)) < 0);
929        assertTrue(millisElapsedSince(startTime) >= delay);
930        s2.sn.request(64);
931        p.close();
932        s2.awaitComplete();
933        s1.awaitComplete();
934        assertTrue(calls.get() >= 2);
935    }
936
937    /**
938     * Timed offer succeeds if drop handler forces request
939     */
940    public void testRecoveredHandledDroppedTimedOffer() {
941        AtomicInteger calls = new AtomicInteger();
942        SubmissionPublisher<Integer> p
943            = new SubmissionPublisher<>(basicExecutor, 4);
944        TestSubscriber s1 = new TestSubscriber();
945        s1.request = false;
946        TestSubscriber s2 = new TestSubscriber();
947        s2.request = false;
948        p.subscribe(s1);
949        p.subscribe(s2);
950        s2.awaitSubscribe();
951        s1.awaitSubscribe();
952        int n = 0;
953        long delay = timeoutMillis();
954        long startTime = System.nanoTime();
955        for (int i = 1; i <= 6; ++i) {
956            int d = p.offer(i, delay, MILLISECONDS, (s, x) -> reqHandle(calls, s));
957            n = n + 2 + (d < 0 ? d : 0);
958        }
959        assertTrue(millisElapsedSince(startTime) >= delay);
960        p.close();
961        s2.awaitComplete();
962        s1.awaitComplete();
963        assertEquals(n, s1.nexts + s2.nexts);
964        assertTrue(calls.get() >= 2);
965    }
966
967    /**
968     * consume returns a CompletableFuture that is done when
969     * publisher completes
970     */
971    public void testConsume() {
972        AtomicInteger sum = new AtomicInteger();
973        SubmissionPublisher<Integer> p = basicPublisher();
974        CompletableFuture<Void> f =
975            p.consume((Integer x) -> sum.getAndAdd(x.intValue()));
976        int n = 20;
977        for (int i = 1; i <= n; ++i)
978            p.submit(i);
979        p.close();
980        f.join();
981        assertEquals((n * (n + 1)) / 2, sum.get());
982    }
983
984    /**
985     * consume(null) throws NPE
986     */
987    public void testConsumeNPE() {
988        SubmissionPublisher<Integer> p = basicPublisher();
989        try {
990            CompletableFuture<Void> f = p.consume(null);
991            shouldThrow();
992        } catch (NullPointerException success) {}
993    }
994
995    /**
996     * consume eventually stops processing published items if cancelled
997     */
998    public void testCancelledConsume() {
999        AtomicInteger count = new AtomicInteger();
1000        SubmissionPublisher<Integer> p = basicPublisher();
1001        CompletableFuture<Void> f = p.consume(x -> count.getAndIncrement());
1002        f.cancel(true);
1003        int n = 1000000; // arbitrary limit
1004        for (int i = 1; i <= n; ++i)
1005            p.submit(i);
1006        assertTrue(count.get() < n);
1007    }
1008
1009}
1010