1/*
2 * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package java.util.stream;
26
27import java.util.Objects;
28import java.util.Optional;
29import java.util.OptionalDouble;
30import java.util.OptionalInt;
31import java.util.OptionalLong;
32import java.util.Spliterator;
33import java.util.concurrent.CountedCompleter;
34import java.util.function.BiConsumer;
35import java.util.function.BiFunction;
36import java.util.function.BinaryOperator;
37import java.util.function.DoubleBinaryOperator;
38import java.util.function.IntBinaryOperator;
39import java.util.function.LongBinaryOperator;
40import java.util.function.ObjDoubleConsumer;
41import java.util.function.ObjIntConsumer;
42import java.util.function.ObjLongConsumer;
43import java.util.function.Supplier;
44
45/**
46 * Factory for creating instances of {@code TerminalOp} that implement
47 * reductions.
48 *
49 * @since 1.8
50 */
51final class ReduceOps {
52
53    private ReduceOps() { }
54
55    /**
56     * Constructs a {@code TerminalOp} that implements a functional reduce on
57     * reference values.
58     *
59     * @param <T> the type of the input elements
60     * @param <U> the type of the result
61     * @param seed the identity element for the reduction
62     * @param reducer the accumulating function that incorporates an additional
63     *        input element into the result
64     * @param combiner the combining function that combines two intermediate
65     *        results
66     * @return a {@code TerminalOp} implementing the reduction
67     */
68    public static <T, U> TerminalOp<T, U>
69    makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
70        Objects.requireNonNull(reducer);
71        Objects.requireNonNull(combiner);
72        class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
73            @Override
74            public void begin(long size) {
75                state = seed;
76            }
77
78            @Override
79            public void accept(T t) {
80                state = reducer.apply(state, t);
81            }
82
83            @Override
84            public void combine(ReducingSink other) {
85                state = combiner.apply(state, other.state);
86            }
87        }
88        return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
89            @Override
90            public ReducingSink makeSink() {
91                return new ReducingSink();
92            }
93        };
94    }
95
96    /**
97     * Constructs a {@code TerminalOp} that implements a functional reduce on
98     * reference values producing an optional reference result.
99     *
100     * @param <T> The type of the input elements, and the type of the result
101     * @param operator The reducing function
102     * @return A {@code TerminalOp} implementing the reduction
103     */
104    public static <T> TerminalOp<T, Optional<T>>
105    makeRef(BinaryOperator<T> operator) {
106        Objects.requireNonNull(operator);
107        class ReducingSink
108                implements AccumulatingSink<T, Optional<T>, ReducingSink> {
109            private boolean empty;
110            private T state;
111
112            public void begin(long size) {
113                empty = true;
114                state = null;
115            }
116
117            @Override
118            public void accept(T t) {
119                if (empty) {
120                    empty = false;
121                    state = t;
122                } else {
123                    state = operator.apply(state, t);
124                }
125            }
126
127            @Override
128            public Optional<T> get() {
129                return empty ? Optional.empty() : Optional.of(state);
130            }
131
132            @Override
133            public void combine(ReducingSink other) {
134                if (!other.empty)
135                    accept(other.state);
136            }
137        }
138        return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) {
139            @Override
140            public ReducingSink makeSink() {
141                return new ReducingSink();
142            }
143        };
144    }
145
146    /**
147     * Constructs a {@code TerminalOp} that implements a mutable reduce on
148     * reference values.
149     *
150     * @param <T> the type of the input elements
151     * @param <I> the type of the intermediate reduction result
152     * @param collector a {@code Collector} defining the reduction
153     * @return a {@code ReduceOp} implementing the reduction
154     */
155    public static <T, I> TerminalOp<T, I>
156    makeRef(Collector<? super T, I, ?> collector) {
157        Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
158        BiConsumer<I, ? super T> accumulator = collector.accumulator();
159        BinaryOperator<I> combiner = collector.combiner();
160        class ReducingSink extends Box<I>
161                implements AccumulatingSink<T, I, ReducingSink> {
162            @Override
163            public void begin(long size) {
164                state = supplier.get();
165            }
166
167            @Override
168            public void accept(T t) {
169                accumulator.accept(state, t);
170            }
171
172            @Override
173            public void combine(ReducingSink other) {
174                state = combiner.apply(state, other.state);
175            }
176        }
177        return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
178            @Override
179            public ReducingSink makeSink() {
180                return new ReducingSink();
181            }
182
183            @Override
184            public int getOpFlags() {
185                return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
186                       ? StreamOpFlag.NOT_ORDERED
187                       : 0;
188            }
189        };
190    }
191
192    /**
193     * Constructs a {@code TerminalOp} that implements a mutable reduce on
194     * reference values.
195     *
196     * @param <T> the type of the input elements
197     * @param <R> the type of the result
198     * @param seedFactory a factory to produce a new base accumulator
199     * @param accumulator a function to incorporate an element into an
200     *        accumulator
201     * @param reducer a function to combine an accumulator into another
202     * @return a {@code TerminalOp} implementing the reduction
203     */
204    public static <T, R> TerminalOp<T, R>
205    makeRef(Supplier<R> seedFactory,
206            BiConsumer<R, ? super T> accumulator,
207            BiConsumer<R,R> reducer) {
208        Objects.requireNonNull(seedFactory);
209        Objects.requireNonNull(accumulator);
210        Objects.requireNonNull(reducer);
211        class ReducingSink extends Box<R>
212                implements AccumulatingSink<T, R, ReducingSink> {
213            @Override
214            public void begin(long size) {
215                state = seedFactory.get();
216            }
217
218            @Override
219            public void accept(T t) {
220                accumulator.accept(state, t);
221            }
222
223            @Override
224            public void combine(ReducingSink other) {
225                reducer.accept(state, other.state);
226            }
227        }
228        return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) {
229            @Override
230            public ReducingSink makeSink() {
231                return new ReducingSink();
232            }
233        };
234    }
235
236    /**
237     * Constructs a {@code TerminalOp} that counts the number of stream
238     * elements.  If the size of the pipeline is known then count is the size
239     * and there is no need to evaluate the pipeline.  If the size of the
240     * pipeline is non known then count is produced, via reduction, using a
241     * {@link CountingSink}.
242     *
243     * @param <T> the type of the input elements
244     * @return a {@code TerminalOp} implementing the counting
245     */
246    public static <T> TerminalOp<T, Long>
247    makeRefCounting() {
248        return new ReduceOp<T, Long, CountingSink<T>>(StreamShape.REFERENCE) {
249            @Override
250            public CountingSink<T> makeSink() { return new CountingSink.OfRef<>(); }
251
252            @Override
253            public <P_IN> Long evaluateSequential(PipelineHelper<T> helper,
254                                                  Spliterator<P_IN> spliterator) {
255                if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
256                    return spliterator.getExactSizeIfKnown();
257                return super.evaluateSequential(helper, spliterator);
258            }
259
260            @Override
261            public <P_IN> Long evaluateParallel(PipelineHelper<T> helper,
262                                                Spliterator<P_IN> spliterator) {
263                if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
264                    return spliterator.getExactSizeIfKnown();
265                return super.evaluateParallel(helper, spliterator);
266            }
267
268            @Override
269            public int getOpFlags() {
270                return StreamOpFlag.NOT_ORDERED;
271            }
272        };
273    }
274
275    /**
276     * Constructs a {@code TerminalOp} that implements a functional reduce on
277     * {@code int} values.
278     *
279     * @param identity the identity for the combining function
280     * @param operator the combining function
281     * @return a {@code TerminalOp} implementing the reduction
282     */
283    public static TerminalOp<Integer, Integer>
284    makeInt(int identity, IntBinaryOperator operator) {
285        Objects.requireNonNull(operator);
286        class ReducingSink
287                implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt {
288            private int state;
289
290            @Override
291            public void begin(long size) {
292                state = identity;
293            }
294
295            @Override
296            public void accept(int t) {
297                state = operator.applyAsInt(state, t);
298            }
299
300            @Override
301            public Integer get() {
302                return state;
303            }
304
305            @Override
306            public void combine(ReducingSink other) {
307                accept(other.state);
308            }
309        }
310        return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) {
311            @Override
312            public ReducingSink makeSink() {
313                return new ReducingSink();
314            }
315        };
316    }
317
318    /**
319     * Constructs a {@code TerminalOp} that implements a functional reduce on
320     * {@code int} values, producing an optional integer result.
321     *
322     * @param operator the combining function
323     * @return a {@code TerminalOp} implementing the reduction
324     */
325    public static TerminalOp<Integer, OptionalInt>
326    makeInt(IntBinaryOperator operator) {
327        Objects.requireNonNull(operator);
328        class ReducingSink
329                implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt {
330            private boolean empty;
331            private int state;
332
333            public void begin(long size) {
334                empty = true;
335                state = 0;
336            }
337
338            @Override
339            public void accept(int t) {
340                if (empty) {
341                    empty = false;
342                    state = t;
343                }
344                else {
345                    state = operator.applyAsInt(state, t);
346                }
347            }
348
349            @Override
350            public OptionalInt get() {
351                return empty ? OptionalInt.empty() : OptionalInt.of(state);
352            }
353
354            @Override
355            public void combine(ReducingSink other) {
356                if (!other.empty)
357                    accept(other.state);
358            }
359        }
360        return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) {
361            @Override
362            public ReducingSink makeSink() {
363                return new ReducingSink();
364            }
365        };
366    }
367
368    /**
369     * Constructs a {@code TerminalOp} that implements a mutable reduce on
370     * {@code int} values.
371     *
372     * @param <R> The type of the result
373     * @param supplier a factory to produce a new accumulator of the result type
374     * @param accumulator a function to incorporate an int into an
375     *        accumulator
376     * @param combiner a function to combine an accumulator into another
377     * @return A {@code ReduceOp} implementing the reduction
378     */
379    public static <R> TerminalOp<Integer, R>
380    makeInt(Supplier<R> supplier,
381            ObjIntConsumer<R> accumulator,
382            BinaryOperator<R> combiner) {
383        Objects.requireNonNull(supplier);
384        Objects.requireNonNull(accumulator);
385        Objects.requireNonNull(combiner);
386        class ReducingSink extends Box<R>
387                implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt {
388            @Override
389            public void begin(long size) {
390                state = supplier.get();
391            }
392
393            @Override
394            public void accept(int t) {
395                accumulator.accept(state, t);
396            }
397
398            @Override
399            public void combine(ReducingSink other) {
400                state = combiner.apply(state, other.state);
401            }
402        }
403        return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) {
404            @Override
405            public ReducingSink makeSink() {
406                return new ReducingSink();
407            }
408        };
409    }
410
411    /**
412     * Constructs a {@code TerminalOp} that counts the number of stream
413     * elements.  If the size of the pipeline is known then count is the size
414     * and there is no need to evaluate the pipeline.  If the size of the
415     * pipeline is non known then count is produced, via reduction, using a
416     * {@link CountingSink}.
417     *
418     * @return a {@code TerminalOp} implementing the counting
419     */
420    public static TerminalOp<Integer, Long>
421    makeIntCounting() {
422        return new ReduceOp<Integer, Long, CountingSink<Integer>>(StreamShape.INT_VALUE) {
423            @Override
424            public CountingSink<Integer> makeSink() { return new CountingSink.OfInt(); }
425
426            @Override
427            public <P_IN> Long evaluateSequential(PipelineHelper<Integer> helper,
428                                                  Spliterator<P_IN> spliterator) {
429                if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
430                    return spliterator.getExactSizeIfKnown();
431                return super.evaluateSequential(helper, spliterator);
432            }
433
434            @Override
435            public <P_IN> Long evaluateParallel(PipelineHelper<Integer> helper,
436                                                Spliterator<P_IN> spliterator) {
437                if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
438                    return spliterator.getExactSizeIfKnown();
439                return super.evaluateParallel(helper, spliterator);
440            }
441
442            @Override
443            public int getOpFlags() {
444                return StreamOpFlag.NOT_ORDERED;
445            }
446        };
447    }
448
449    /**
450     * Constructs a {@code TerminalOp} that implements a functional reduce on
451     * {@code long} values.
452     *
453     * @param identity the identity for the combining function
454     * @param operator the combining function
455     * @return a {@code TerminalOp} implementing the reduction
456     */
457    public static TerminalOp<Long, Long>
458    makeLong(long identity, LongBinaryOperator operator) {
459        Objects.requireNonNull(operator);
460        class ReducingSink
461                implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong {
462            private long state;
463
464            @Override
465            public void begin(long size) {
466                state = identity;
467            }
468
469            @Override
470            public void accept(long t) {
471                state = operator.applyAsLong(state, t);
472            }
473
474            @Override
475            public Long get() {
476                return state;
477            }
478
479            @Override
480            public void combine(ReducingSink other) {
481                accept(other.state);
482            }
483        }
484        return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) {
485            @Override
486            public ReducingSink makeSink() {
487                return new ReducingSink();
488            }
489        };
490    }
491
492    /**
493     * Constructs a {@code TerminalOp} that implements a functional reduce on
494     * {@code long} values, producing an optional long result.
495     *
496     * @param operator the combining function
497     * @return a {@code TerminalOp} implementing the reduction
498     */
499    public static TerminalOp<Long, OptionalLong>
500    makeLong(LongBinaryOperator operator) {
501        Objects.requireNonNull(operator);
502        class ReducingSink
503                implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong {
504            private boolean empty;
505            private long state;
506
507            public void begin(long size) {
508                empty = true;
509                state = 0;
510            }
511
512            @Override
513            public void accept(long t) {
514                if (empty) {
515                    empty = false;
516                    state = t;
517                }
518                else {
519                    state = operator.applyAsLong(state, t);
520                }
521            }
522
523            @Override
524            public OptionalLong get() {
525                return empty ? OptionalLong.empty() : OptionalLong.of(state);
526            }
527
528            @Override
529            public void combine(ReducingSink other) {
530                if (!other.empty)
531                    accept(other.state);
532            }
533        }
534        return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) {
535            @Override
536            public ReducingSink makeSink() {
537                return new ReducingSink();
538            }
539        };
540    }
541
542    /**
543     * Constructs a {@code TerminalOp} that implements a mutable reduce on
544     * {@code long} values.
545     *
546     * @param <R> the type of the result
547     * @param supplier a factory to produce a new accumulator of the result type
548     * @param accumulator a function to incorporate an int into an
549     *        accumulator
550     * @param combiner a function to combine an accumulator into another
551     * @return a {@code TerminalOp} implementing the reduction
552     */
553    public static <R> TerminalOp<Long, R>
554    makeLong(Supplier<R> supplier,
555             ObjLongConsumer<R> accumulator,
556             BinaryOperator<R> combiner) {
557        Objects.requireNonNull(supplier);
558        Objects.requireNonNull(accumulator);
559        Objects.requireNonNull(combiner);
560        class ReducingSink extends Box<R>
561                implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong {
562            @Override
563            public void begin(long size) {
564                state = supplier.get();
565            }
566
567            @Override
568            public void accept(long t) {
569                accumulator.accept(state, t);
570            }
571
572            @Override
573            public void combine(ReducingSink other) {
574                state = combiner.apply(state, other.state);
575            }
576        }
577        return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) {
578            @Override
579            public ReducingSink makeSink() {
580                return new ReducingSink();
581            }
582        };
583    }
584
585    /**
586     * Constructs a {@code TerminalOp} that counts the number of stream
587     * elements.  If the size of the pipeline is known then count is the size
588     * and there is no need to evaluate the pipeline.  If the size of the
589     * pipeline is non known then count is produced, via reduction, using a
590     * {@link CountingSink}.
591     *
592     * @return a {@code TerminalOp} implementing the counting
593     */
594    public static TerminalOp<Long, Long>
595    makeLongCounting() {
596        return new ReduceOp<Long, Long, CountingSink<Long>>(StreamShape.LONG_VALUE) {
597            @Override
598            public CountingSink<Long> makeSink() { return new CountingSink.OfLong(); }
599
600            @Override
601            public <P_IN> Long evaluateSequential(PipelineHelper<Long> helper,
602                                                  Spliterator<P_IN> spliterator) {
603                if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
604                    return spliterator.getExactSizeIfKnown();
605                return super.evaluateSequential(helper, spliterator);
606            }
607
608            @Override
609            public <P_IN> Long evaluateParallel(PipelineHelper<Long> helper,
610                                                Spliterator<P_IN> spliterator) {
611                if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
612                    return spliterator.getExactSizeIfKnown();
613                return super.evaluateParallel(helper, spliterator);
614            }
615
616            @Override
617            public int getOpFlags() {
618                return StreamOpFlag.NOT_ORDERED;
619            }
620        };
621    }
622
623    /**
624     * Constructs a {@code TerminalOp} that implements a functional reduce on
625     * {@code double} values.
626     *
627     * @param identity the identity for the combining function
628     * @param operator the combining function
629     * @return a {@code TerminalOp} implementing the reduction
630     */
631    public static TerminalOp<Double, Double>
632    makeDouble(double identity, DoubleBinaryOperator operator) {
633        Objects.requireNonNull(operator);
634        class ReducingSink
635                implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble {
636            private double state;
637
638            @Override
639            public void begin(long size) {
640                state = identity;
641            }
642
643            @Override
644            public void accept(double t) {
645                state = operator.applyAsDouble(state, t);
646            }
647
648            @Override
649            public Double get() {
650                return state;
651            }
652
653            @Override
654            public void combine(ReducingSink other) {
655                accept(other.state);
656            }
657        }
658        return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) {
659            @Override
660            public ReducingSink makeSink() {
661                return new ReducingSink();
662            }
663        };
664    }
665
666    /**
667     * Constructs a {@code TerminalOp} that implements a functional reduce on
668     * {@code double} values, producing an optional double result.
669     *
670     * @param operator the combining function
671     * @return a {@code TerminalOp} implementing the reduction
672     */
673    public static TerminalOp<Double, OptionalDouble>
674    makeDouble(DoubleBinaryOperator operator) {
675        Objects.requireNonNull(operator);
676        class ReducingSink
677                implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble {
678            private boolean empty;
679            private double state;
680
681            public void begin(long size) {
682                empty = true;
683                state = 0;
684            }
685
686            @Override
687            public void accept(double t) {
688                if (empty) {
689                    empty = false;
690                    state = t;
691                }
692                else {
693                    state = operator.applyAsDouble(state, t);
694                }
695            }
696
697            @Override
698            public OptionalDouble get() {
699                return empty ? OptionalDouble.empty() : OptionalDouble.of(state);
700            }
701
702            @Override
703            public void combine(ReducingSink other) {
704                if (!other.empty)
705                    accept(other.state);
706            }
707        }
708        return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) {
709            @Override
710            public ReducingSink makeSink() {
711                return new ReducingSink();
712            }
713        };
714    }
715
716    /**
717     * Constructs a {@code TerminalOp} that implements a mutable reduce on
718     * {@code double} values.
719     *
720     * @param <R> the type of the result
721     * @param supplier a factory to produce a new accumulator of the result type
722     * @param accumulator a function to incorporate an int into an
723     *        accumulator
724     * @param combiner a function to combine an accumulator into another
725     * @return a {@code TerminalOp} implementing the reduction
726     */
727    public static <R> TerminalOp<Double, R>
728    makeDouble(Supplier<R> supplier,
729               ObjDoubleConsumer<R> accumulator,
730               BinaryOperator<R> combiner) {
731        Objects.requireNonNull(supplier);
732        Objects.requireNonNull(accumulator);
733        Objects.requireNonNull(combiner);
734        class ReducingSink extends Box<R>
735                implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble {
736            @Override
737            public void begin(long size) {
738                state = supplier.get();
739            }
740
741            @Override
742            public void accept(double t) {
743                accumulator.accept(state, t);
744            }
745
746            @Override
747            public void combine(ReducingSink other) {
748                state = combiner.apply(state, other.state);
749            }
750        }
751        return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) {
752            @Override
753            public ReducingSink makeSink() {
754                return new ReducingSink();
755            }
756        };
757    }
758
759    /**
760     * Constructs a {@code TerminalOp} that counts the number of stream
761     * elements.  If the size of the pipeline is known then count is the size
762     * and there is no need to evaluate the pipeline.  If the size of the
763     * pipeline is non known then count is produced, via reduction, using a
764     * {@link CountingSink}.
765     *
766     * @return a {@code TerminalOp} implementing the counting
767     */
768    public static TerminalOp<Double, Long>
769    makeDoubleCounting() {
770        return new ReduceOp<Double, Long, CountingSink<Double>>(StreamShape.DOUBLE_VALUE) {
771            @Override
772            public CountingSink<Double> makeSink() { return new CountingSink.OfDouble(); }
773
774            @Override
775            public <P_IN> Long evaluateSequential(PipelineHelper<Double> helper,
776                                                  Spliterator<P_IN> spliterator) {
777                if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
778                    return spliterator.getExactSizeIfKnown();
779                return super.evaluateSequential(helper, spliterator);
780            }
781
782            @Override
783            public <P_IN> Long evaluateParallel(PipelineHelper<Double> helper,
784                                                Spliterator<P_IN> spliterator) {
785                if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
786                    return spliterator.getExactSizeIfKnown();
787                return super.evaluateParallel(helper, spliterator);
788            }
789
790            @Override
791            public int getOpFlags() {
792                return StreamOpFlag.NOT_ORDERED;
793            }
794        };
795    }
796
797    /**
798     * A sink that counts elements
799     */
800    abstract static class CountingSink<T>
801            extends Box<Long>
802            implements AccumulatingSink<T, Long, CountingSink<T>> {
803        long count;
804
805        @Override
806        public void begin(long size) {
807            count = 0L;
808        }
809
810        @Override
811        public Long get() {
812            return count;
813        }
814
815        @Override
816        public void combine(CountingSink<T> other) {
817            count += other.count;
818        }
819
820        static final class OfRef<T> extends CountingSink<T> {
821            @Override
822            public void accept(T t) {
823                count++;
824            }
825        }
826
827        static final class OfInt extends CountingSink<Integer> implements Sink.OfInt {
828            @Override
829            public void accept(int t) {
830                count++;
831            }
832        }
833
834        static final class OfLong extends CountingSink<Long> implements Sink.OfLong {
835            @Override
836            public void accept(long t) {
837                count++;
838            }
839        }
840
841        static final class OfDouble extends CountingSink<Double> implements Sink.OfDouble {
842            @Override
843            public void accept(double t) {
844                count++;
845            }
846        }
847    }
848
849    /**
850     * A type of {@code TerminalSink} that implements an associative reducing
851     * operation on elements of type {@code T} and producing a result of type
852     * {@code R}.
853     *
854     * @param <T> the type of input element to the combining operation
855     * @param <R> the result type
856     * @param <K> the type of the {@code AccumulatingSink}.
857     */
858    private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>>
859            extends TerminalSink<T, R> {
860        void combine(K other);
861    }
862
863    /**
864     * State box for a single state element, used as a base class for
865     * {@code AccumulatingSink} instances
866     *
867     * @param <U> The type of the state element
868     */
869    private abstract static class Box<U> {
870        U state;
871
872        Box() {} // Avoid creation of special accessor
873
874        public U get() {
875            return state;
876        }
877    }
878
879    /**
880     * A {@code TerminalOp} that evaluates a stream pipeline and sends the
881     * output into an {@code AccumulatingSink}, which performs a reduce
882     * operation. The {@code AccumulatingSink} must represent an associative
883     * reducing operation.
884     *
885     * @param <T> the output type of the stream pipeline
886     * @param <R> the result type of the reducing operation
887     * @param <S> the type of the {@code AccumulatingSink}
888     */
889    private abstract static class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
890            implements TerminalOp<T, R> {
891        private final StreamShape inputShape;
892
893        /**
894         * Create a {@code ReduceOp} of the specified stream shape which uses
895         * the specified {@code Supplier} to create accumulating sinks.
896         *
897         * @param shape The shape of the stream pipeline
898         */
899        ReduceOp(StreamShape shape) {
900            inputShape = shape;
901        }
902
903        public abstract S makeSink();
904
905        @Override
906        public StreamShape inputShape() {
907            return inputShape;
908        }
909
910        @Override
911        public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
912                                           Spliterator<P_IN> spliterator) {
913            return helper.wrapAndCopyInto(makeSink(), spliterator).get();
914        }
915
916        @Override
917        public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
918                                         Spliterator<P_IN> spliterator) {
919            return new ReduceTask<>(this, helper, spliterator).invoke().get();
920        }
921    }
922
923    /**
924     * A {@code ForkJoinTask} for performing a parallel reduce operation.
925     */
926    @SuppressWarnings("serial")
927    private static final class ReduceTask<P_IN, P_OUT, R,
928                                          S extends AccumulatingSink<P_OUT, R, S>>
929            extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
930        private final ReduceOp<P_OUT, R, S> op;
931
932        ReduceTask(ReduceOp<P_OUT, R, S> op,
933                   PipelineHelper<P_OUT> helper,
934                   Spliterator<P_IN> spliterator) {
935            super(helper, spliterator);
936            this.op = op;
937        }
938
939        ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,
940                   Spliterator<P_IN> spliterator) {
941            super(parent, spliterator);
942            this.op = parent.op;
943        }
944
945        @Override
946        protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
947            return new ReduceTask<>(this, spliterator);
948        }
949
950        @Override
951        protected S doLeaf() {
952            return helper.wrapAndCopyInto(op.makeSink(), spliterator);
953        }
954
955        @Override
956        public void onCompletion(CountedCompleter<?> caller) {
957            if (!isLeaf()) {
958                S leftResult = leftChild.getLocalResult();
959                leftResult.combine(rightChild.getLocalResult());
960                setLocalResult(leftResult);
961            }
962            // GC spliterator, left and right child
963            super.onCompletion(caller);
964        }
965    }
966}
967