1/*
2 * Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25package java.util.stream;
26
27import java.util.Objects;
28import java.util.Spliterator;
29import java.util.function.IntFunction;
30import java.util.function.Supplier;
31
32/**
33 * Abstract base class for "pipeline" classes, which are the core
34 * implementations of the Stream interface and its primitive specializations.
35 * Manages construction and evaluation of stream pipelines.
36 *
37 * <p>An {@code AbstractPipeline} represents an initial portion of a stream
38 * pipeline, encapsulating a stream source and zero or more intermediate
39 * operations.  The individual {@code AbstractPipeline} objects are often
40 * referred to as <em>stages</em>, where each stage describes either the stream
41 * source or an intermediate operation.
42 *
43 * <p>A concrete intermediate stage is generally built from an
44 * {@code AbstractPipeline}, a shape-specific pipeline class which extends it
45 * (e.g., {@code IntPipeline}) which is also abstract, and an operation-specific
46 * concrete class which extends that.  {@code AbstractPipeline} contains most of
47 * the mechanics of evaluating the pipeline, and implements methods that will be
48 * used by the operation; the shape-specific classes add helper methods for
49 * dealing with collection of results into the appropriate shape-specific
50 * containers.
51 *
52 * <p>After chaining a new intermediate operation, or executing a terminal
53 * operation, the stream is considered to be consumed, and no more intermediate
54 * or terminal operations are permitted on this stream instance.
55 *
56 * @implNote
57 * <p>For sequential streams, and parallel streams without
58 * <a href="package-summary.html#StreamOps">stateful intermediate
59 * operations</a>, parallel streams, pipeline evaluation is done in a single
60 * pass that "jams" all the operations together.  For parallel streams with
61 * stateful operations, execution is divided into segments, where each
62 * stateful operations marks the end of a segment, and each segment is
63 * evaluated separately and the result used as the input to the next
64 * segment.  In all cases, the source data is not consumed until a terminal
65 * operation begins.
66 *
67 * @param   type of input elements
68 * @param  type of output elements
69 * @param <S> type of the subclass implementing {@code BaseStream}
70 * @since 1.8
71 */
72abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
73        extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
74    private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed";
75    private static final String MSG_CONSUMED = "source already consumed or closed";
76
77    /**
78     * Backlink to the head of the pipeline chain (self if this is the source
79     * stage).
80     */
81    @SuppressWarnings("rawtypes")
82    private final AbstractPipeline sourceStage;
83
84    /**
85     * The "upstream" pipeline, or null if this is the source stage.
86     */
87    @SuppressWarnings("rawtypes")
88    private final AbstractPipeline previousStage;
89
90    /**
91     * The operation flags for the intermediate operation represented by this
92     * pipeline object.
93     */
94    protected final int sourceOrOpFlags;
95
96    /**
97     * The next stage in the pipeline, or null if this is the last stage.
98     * Effectively final at the point of linking to the next pipeline.
99     */
100    @SuppressWarnings("rawtypes")
101    private AbstractPipeline nextStage;
102
103    /**
104     * The number of intermediate operations between this pipeline object
105     * and the stream source if sequential, or the previous stateful if parallel.
106     * Valid at the point of pipeline preparation for evaluation.
107     */
108    private int depth;
109
110    /**
111     * The combined source and operation flags for the source and all operations
112     * up to and including the operation represented by this pipeline object.
113     * Valid at the point of pipeline preparation for evaluation.
114     */
115    private int combinedFlags;
116
117    /**
118     * The source spliterator. Only valid for the head pipeline.
119     * Before the pipeline is consumed if non-null then {@code sourceSupplier}
120     * must be null. After the pipeline is consumed if non-null then is set to
121     * null.
122     */
123    private Spliterator<?> sourceSpliterator;
124
125    /**
126     * The source supplier. Only valid for the head pipeline. Before the
127     * pipeline is consumed if non-null then {@code sourceSpliterator} must be
128     * null. After the pipeline is consumed if non-null then is set to null.
129     */
130    private Supplier<? extends Spliterator<?>> sourceSupplier;
131
132    /**
133     * True if this pipeline has been linked or consumed
134     */
135    private boolean linkedOrConsumed;
136
137    /**
138     * True if there are any stateful ops in the pipeline; only valid for the
139     * source stage.
140     */
141    private boolean sourceAnyStateful;
142
143    private Runnable sourceCloseAction;
144
145    /**
146     * True if pipeline is parallel, otherwise the pipeline is sequential; only
147     * valid for the source stage.
148     */
149    private boolean parallel;
150
151    /**
152     * Constructor for the head of a stream pipeline.
153     *
154     * @param source {@code Supplier<Spliterator>} describing the stream source
155     * @param sourceFlags The source flags for the stream source, described in
156     * {@link StreamOpFlag}
157     * @param parallel True if the pipeline is parallel
158     */
159    AbstractPipeline(Supplier<? extends Spliterator<?>> source,
160                     int sourceFlags, boolean parallel) {
161        this.previousStage = null;
162        this.sourceSupplier = source;
163        this.sourceStage = this;
164        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
165        // The following is an optimization of:
166        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
167        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
168        this.depth = 0;
169        this.parallel = parallel;
170    }
171
172    /**
173     * Constructor for the head of a stream pipeline.
174     *
175     * @param source {@code Spliterator} describing the stream source
176     * @param sourceFlags the source flags for the stream source, described in
177     * {@link StreamOpFlag}
178     * @param parallel {@code true} if the pipeline is parallel
179     */
180    AbstractPipeline(Spliterator<?> source,
181                     int sourceFlags, boolean parallel) {
182        this.previousStage = null;
183        this.sourceSpliterator = source;
184        this.sourceStage = this;
185        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
186        // The following is an optimization of:
187        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
188        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
189        this.depth = 0;
190        this.parallel = parallel;
191    }
192
193    /**
194     * Constructor for appending an intermediate operation stage onto an
195     * existing pipeline.
196     *
197     * @param previousStage the upstream pipeline stage
198     * @param opFlags the operation flags for the new stage, described in
199     * {@link StreamOpFlag}
200     */
201    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
202        if (previousStage.linkedOrConsumed)
203            throw new IllegalStateException(MSG_STREAM_LINKED);
204        previousStage.linkedOrConsumed = true;
205        previousStage.nextStage = this;
206
207        this.previousStage = previousStage;
208        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
209        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
210        this.sourceStage = previousStage.sourceStage;
211        if (opIsStateful())
212            sourceStage.sourceAnyStateful = true;
213        this.depth = previousStage.depth + 1;
214    }
215
216
217    // Terminal evaluation methods
218
219    /**
220     * Evaluate the pipeline with a terminal operation to produce a result.
221     *
222     * @param <R> the type of result
223     * @param terminalOp the terminal operation to be applied to the pipeline.
224     * @return the result
225     */
226    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
227        assert getOutputShape() == terminalOp.inputShape();
228        if (linkedOrConsumed)
229            throw new IllegalStateException(MSG_STREAM_LINKED);
230        linkedOrConsumed = true;
231
232        return isParallel()
233               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
234               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
235    }
236
237    /**
238     * Collect the elements output from the pipeline stage.
239     *
240     * @param generator the array generator to be used to create array instances
241     * @return a flat array-backed Node that holds the collected output elements
242     */
243    @SuppressWarnings("unchecked")
244    final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
245        if (linkedOrConsumed)
246            throw new IllegalStateException(MSG_STREAM_LINKED);
247        linkedOrConsumed = true;
248
249        // If the last intermediate operation is stateful then
250        // evaluate directly to avoid an extra collection step
251        if (isParallel() && previousStage != null && opIsStateful()) {
252            // Set the depth of this, last, pipeline stage to zero to slice the
253            // pipeline such that this operation will not be included in the
254            // upstream slice and upstream operations will not be included
255            // in this slice
256            depth = 0;
257            return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
258        }
259        else {
260            return evaluate(sourceSpliterator(0), true, generator);
261        }
262    }
263
264    /**
265     * Gets the source stage spliterator if this pipeline stage is the source
266     * stage.  The pipeline is consumed after this method is called and
267     * returns successfully.
268     *
269     * @return the source stage spliterator
270     * @throws IllegalStateException if this pipeline stage is not the source
271     *         stage.
272     */
273    @SuppressWarnings("unchecked")
274    final Spliterator<E_OUT> sourceStageSpliterator() {
275        if (this != sourceStage)
276            throw new IllegalStateException();
277
278        if (linkedOrConsumed)
279            throw new IllegalStateException(MSG_STREAM_LINKED);
280        linkedOrConsumed = true;
281
282        if (sourceStage.sourceSpliterator != null) {
283            @SuppressWarnings("unchecked")
284            Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
285            sourceStage.sourceSpliterator = null;
286            return s;
287        }
288        else if (sourceStage.sourceSupplier != null) {
289            @SuppressWarnings("unchecked")
290            Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
291            sourceStage.sourceSupplier = null;
292            return s;
293        }
294        else {
295            throw new IllegalStateException(MSG_CONSUMED);
296        }
297    }
298
299    // BaseStream
300
301    @Override
302    @SuppressWarnings("unchecked")
303    public final S sequential() {
304        sourceStage.parallel = false;
305        return (S) this;
306    }
307
308    @Override
309    @SuppressWarnings("unchecked")
310    public final S parallel() {
311        sourceStage.parallel = true;
312        return (S) this;
313    }
314
315    @Override
316    public void close() {
317        linkedOrConsumed = true;
318        sourceSupplier = null;
319        sourceSpliterator = null;
320        if (sourceStage.sourceCloseAction != null) {
321            Runnable closeAction = sourceStage.sourceCloseAction;
322            sourceStage.sourceCloseAction = null;
323            closeAction.run();
324        }
325    }
326
327    @Override
328    @SuppressWarnings("unchecked")
329    public S onClose(Runnable closeHandler) {
330        if (linkedOrConsumed)
331            throw new IllegalStateException(MSG_STREAM_LINKED);
332        Objects.requireNonNull(closeHandler);
333        Runnable existingHandler = sourceStage.sourceCloseAction;
334        sourceStage.sourceCloseAction =
335                (existingHandler == null)
336                ? closeHandler
337                : Streams.composeWithExceptions(existingHandler, closeHandler);
338        return (S) this;
339    }
340
341    // Primitive specialization use co-variant overrides, hence is not final
342    @Override
343    @SuppressWarnings("unchecked")
344    public Spliterator<E_OUT> spliterator() {
345        if (linkedOrConsumed)
346            throw new IllegalStateException(MSG_STREAM_LINKED);
347        linkedOrConsumed = true;
348
349        if (this == sourceStage) {
350            if (sourceStage.sourceSpliterator != null) {
351                @SuppressWarnings("unchecked")
352                Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSpliterator;
353                sourceStage.sourceSpliterator = null;
354                return s;
355            }
356            else if (sourceStage.sourceSupplier != null) {
357                @SuppressWarnings("unchecked")
358                Supplier<Spliterator<E_OUT>> s = (Supplier<Spliterator<E_OUT>>) sourceStage.sourceSupplier;
359                sourceStage.sourceSupplier = null;
360                return lazySpliterator(s);
361            }
362            else {
363                throw new IllegalStateException(MSG_CONSUMED);
364            }
365        }
366        else {
367            return wrap(this, () -> sourceSpliterator(0), isParallel());
368        }
369    }
370
371    @Override
372    public final boolean isParallel() {
373        return sourceStage.parallel;
374    }
375
376
377    /**
378     * Returns the composition of stream flags of the stream source and all
379     * intermediate operations.
380     *
381     * @return the composition of stream flags of the stream source and all
382     *         intermediate operations
383     * @see StreamOpFlag
384     */
385    final int getStreamFlags() {
386        return StreamOpFlag.toStreamFlags(combinedFlags);
387    }
388
389    /**
390     * Get the source spliterator for this pipeline stage.  For a sequential or
391     * stateless parallel pipeline, this is the source spliterator.  For a
392     * stateful parallel pipeline, this is a spliterator describing the results
393     * of all computations up to and including the most recent stateful
394     * operation.
395     */
396    @SuppressWarnings("unchecked")
397    private Spliterator<?> sourceSpliterator(int terminalFlags) {
398        // Get the source spliterator of the pipeline
399        Spliterator<?> spliterator = null;
400        if (sourceStage.sourceSpliterator != null) {
401            spliterator = sourceStage.sourceSpliterator;
402            sourceStage.sourceSpliterator = null;
403        }
404        else if (sourceStage.sourceSupplier != null) {
405            spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
406            sourceStage.sourceSupplier = null;
407        }
408        else {
409            throw new IllegalStateException(MSG_CONSUMED);
410        }
411
412        if (isParallel() && sourceStage.sourceAnyStateful) {
413            // Adapt the source spliterator, evaluating each stateful op
414            // in the pipeline up to and including this pipeline stage.
415            // The depth and flags of each pipeline stage are adjusted accordingly.
416            int depth = 1;
417            for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
418                 u != e;
419                 u = p, p = p.nextStage) {
420
421                int thisOpFlags = p.sourceOrOpFlags;
422                if (p.opIsStateful()) {
423                    depth = 0;
424
425                    if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
426                        // Clear the short circuit flag for next pipeline stage
427                        // This stage encapsulates short-circuiting, the next
428                        // stage may not have any short-circuit operations, and
429                        // if so spliterator.forEachRemaining should be used
430                        // for traversal
431                        thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
432                    }
433
434                    spliterator = p.opEvaluateParallelLazy(u, spliterator);
435
436                    // Inject or clear SIZED on the source pipeline stage
437                    // based on the stage's spliterator
438                    thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
439                            ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
440                            : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
441                }
442                p.depth = depth++;
443                p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
444            }
445        }
446
447        if (terminalFlags != 0)  {
448            // Apply flags from the terminal operation to last pipeline stage
449            combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
450        }
451
452        return spliterator;
453    }
454
455    // PipelineHelper
456
457    @Override
458    final StreamShape getSourceShape() {
459        @SuppressWarnings("rawtypes")
460        AbstractPipeline p = AbstractPipeline.this;
461        while (p.depth > 0) {
462            p = p.previousStage;
463        }
464        return p.getOutputShape();
465    }
466
467    @Override
468    final <P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator) {
469        return StreamOpFlag.SIZED.isKnown(getStreamAndOpFlags()) ? spliterator.getExactSizeIfKnown() : -1;
470    }
471
472    @Override
473    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
474        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
475        return sink;
476    }
477
478    @Override
479    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
480        Objects.requireNonNull(wrappedSink);
481
482        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
483            wrappedSink.begin(spliterator.getExactSizeIfKnown());
484            spliterator.forEachRemaining(wrappedSink);
485            wrappedSink.end();
486        }
487        else {
488            copyIntoWithCancel(wrappedSink, spliterator);
489        }
490    }
491
492    @Override
493    @SuppressWarnings("unchecked")
494    final <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
495        @SuppressWarnings({"rawtypes","unchecked"})
496        AbstractPipeline p = AbstractPipeline.this;
497        while (p.depth > 0) {
498            p = p.previousStage;
499        }
500
501        wrappedSink.begin(spliterator.getExactSizeIfKnown());
502        boolean cancelled = p.forEachWithCancel(spliterator, wrappedSink);
503        wrappedSink.end();
504        return cancelled;
505    }
506
507    @Override
508    final int getStreamAndOpFlags() {
509        return combinedFlags;
510    }
511
512    final boolean isOrdered() {
513        return StreamOpFlag.ORDERED.isKnown(combinedFlags);
514    }
515
516    @Override
517    @SuppressWarnings("unchecked")
518    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
519        Objects.requireNonNull(sink);
520
521        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
522            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
523        }
524        return (Sink<P_IN>) sink;
525    }
526
527    @Override
528    @SuppressWarnings("unchecked")
529    final <P_IN> Spliterator<E_OUT> wrapSpliterator(Spliterator<P_IN> sourceSpliterator) {
530        if (depth == 0) {
531            return (Spliterator<E_OUT>) sourceSpliterator;
532        }
533        else {
534            return wrap(this, () -> sourceSpliterator, isParallel());
535        }
536    }
537
538    @Override
539    @SuppressWarnings("unchecked")
540    final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,
541                                      boolean flatten,
542                                      IntFunction<E_OUT[]> generator) {
543        if (isParallel()) {
544            // @@@ Optimize if op of this pipeline stage is a stateful op
545            return evaluateToNode(this, spliterator, flatten, generator);
546        }
547        else {
548            Node.Builder<E_OUT> nb = makeNodeBuilder(
549                    exactOutputSizeIfKnown(spliterator), generator);
550            return wrapAndCopyInto(nb, spliterator).build();
551        }
552    }
553
554
555    // Shape-specific abstract methods, implemented by XxxPipeline classes
556
557    /**
558     * Get the output shape of the pipeline.  If the pipeline is the head,
559     * then it's output shape corresponds to the shape of the source.
560     * Otherwise, it's output shape corresponds to the output shape of the
561     * associated operation.
562     *
563     * @return the output shape
564     */
565    abstract StreamShape getOutputShape();
566
567    /**
568     * Collect elements output from a pipeline into a Node that holds elements
569     * of this shape.
570     *
571     * @param helper the pipeline helper describing the pipeline stages
572     * @param spliterator the source spliterator
573     * @param flattenTree true if the returned node should be flattened
574     * @param generator the array generator
575     * @return a Node holding the output of the pipeline
576     */
577    abstract <P_IN> Node<E_OUT> evaluateToNode(PipelineHelper<E_OUT> helper,
578                                               Spliterator<P_IN> spliterator,
579                                               boolean flattenTree,
580                                               IntFunction<E_OUT[]> generator);
581
582    /**
583     * Create a spliterator that wraps a source spliterator, compatible with
584     * this stream shape, and operations associated with a {@link
585     * PipelineHelper}.
586     *
587     * @param ph the pipeline helper describing the pipeline stages
588     * @param supplier the supplier of a spliterator
589     * @return a wrapping spliterator compatible with this shape
590     */
591    abstract <P_IN> Spliterator<E_OUT> wrap(PipelineHelper<E_OUT> ph,
592                                            Supplier<Spliterator<P_IN>> supplier,
593                                            boolean isParallel);
594
595    /**
596     * Create a lazy spliterator that wraps and obtains the supplied the
597     * spliterator when a method is invoked on the lazy spliterator.
598     * @param supplier the supplier of a spliterator
599     */
600    abstract Spliterator<E_OUT> lazySpliterator(Supplier<? extends Spliterator<E_OUT>> supplier);
601
602    /**
603     * Traverse the elements of a spliterator compatible with this stream shape,
604     * pushing those elements into a sink.   If the sink requests cancellation,
605     * no further elements will be pulled or pushed.
606     *
607     * @param spliterator the spliterator to pull elements from
608     * @param sink the sink to push elements to
609     * @return true if the cancellation was requested
610     */
611    abstract boolean forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink);
612
613    /**
614     * Make a node builder compatible with this stream shape.
615     *
616     * @param exactSizeIfKnown if {@literal >=0}, then a node builder will be
617     * created that has a fixed capacity of at most sizeIfKnown elements. If
618     * {@literal < 0}, then the node builder has an unfixed capacity. A fixed
619     * capacity node builder will throw exceptions if an element is added after
620     * builder has reached capacity, or is built before the builder has reached
621     * capacity.
622     *
623     * @param generator the array generator to be used to create instances of a
624     * T[] array. For implementations supporting primitive nodes, this parameter
625     * may be ignored.
626     * @return a node builder
627     */
628    @Override
629    abstract Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown,
630                                                 IntFunction<E_OUT[]> generator);
631
632
633    // Op-specific abstract methods, implemented by the operation class
634
635    /**
636     * Returns whether this operation is stateful or not.  If it is stateful,
637     * then the method
638     * {@link #opEvaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.IntFunction)}
639     * must be overridden.
640     *
641     * @return {@code true} if this operation is stateful
642     */
643    abstract boolean opIsStateful();
644
645    /**
646     * Accepts a {@code Sink} which will receive the results of this operation,
647     * and return a {@code Sink} which accepts elements of the input type of
648     * this operation and which performs the operation, passing the results to
649     * the provided {@code Sink}.
650     *
651     * @apiNote
652     * The implementation may use the {@code flags} parameter to optimize the
653     * sink wrapping.  For example, if the input is already {@code DISTINCT},
654     * the implementation for the {@code Stream#distinct()} method could just
655     * return the sink it was passed.
656     *
657     * @param flags The combined stream and operation flags up to, but not
658     *        including, this operation
659     * @param sink sink to which elements should be sent after processing
660     * @return a sink which accepts elements, perform the operation upon
661     *         each element, and passes the results (if any) to the provided
662     *         {@code Sink}.
663     */
664    abstract Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink);
665
666    /**
667     * Performs a parallel evaluation of the operation using the specified
668     * {@code PipelineHelper} which describes the upstream intermediate
669     * operations.  Only called on stateful operations.  If {@link
670     * #opIsStateful()} returns true then implementations must override the
671     * default implementation.
672     *
673     * @implSpec The default implementation always throw
674     * {@code UnsupportedOperationException}.
675     *
676     * @param helper the pipeline helper describing the pipeline stages
677     * @param spliterator the source {@code Spliterator}
678     * @param generator the array generator
679     * @return a {@code Node} describing the result of the evaluation
680     */
681    <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
682                                          Spliterator<P_IN> spliterator,
683                                          IntFunction<E_OUT[]> generator) {
684        throw new UnsupportedOperationException("Parallel evaluation is not supported");
685    }
686
687    /**
688     * Returns a {@code Spliterator} describing a parallel evaluation of the
689     * operation, using the specified {@code PipelineHelper} which describes the
690     * upstream intermediate operations.  Only called on stateful operations.
691     * It is not necessary (though acceptable) to do a full computation of the
692     * result here; it is preferable, if possible, to describe the result via a
693     * lazily evaluated spliterator.
694     *
695     * @implSpec The default implementation behaves as if:
696     * <pre>{@code
697     *     return evaluateParallel(helper, i -> (E_OUT[]) new
698     * Object[i]).spliterator();
699     * }</pre>
700     * and is suitable for implementations that cannot do better than a full
701     * synchronous evaluation.
702     *
703     * @param helper the pipeline helper
704     * @param spliterator the source {@code Spliterator}
705     * @return a {@code Spliterator} describing the result of the evaluation
706     */
707    @SuppressWarnings("unchecked")
708    <P_IN> Spliterator<E_OUT> opEvaluateParallelLazy(PipelineHelper<E_OUT> helper,
709                                                     Spliterator<P_IN> spliterator) {
710        return opEvaluateParallel(helper, spliterator, i -> (E_OUT[]) new Object[i]).spliterator();
711    }
712}
713