1 /*
2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.Comparator;
28 import java.util.Iterator;
29 import java.util.Objects;
30 import java.util.Optional;
31 import java.util.Spliterator;
32 import java.util.Spliterators;
33 import java.util.function.BiConsumer;
34 import java.util.function.BiFunction;
35 import java.util.function.BinaryOperator;
36 import java.util.function.Consumer;
37 import java.util.function.DoubleConsumer;
38 import java.util.function.Function;
39 import java.util.function.IntConsumer;
40 import java.util.function.IntFunction;
41 import java.util.function.LongConsumer;
42 import java.util.function.Predicate;
43 import java.util.function.Supplier;
44 import java.util.function.ToDoubleFunction;
45 import java.util.function.ToIntFunction;
46 import java.util.function.ToLongFunction;
47 
48 /**
49  * Abstract base class for an intermediate pipeline stage or pipeline source
50  * stage implementing whose elements are of type {@code U}.
51  *
52  * @param <P_IN> type of elements in the upstream source
53  * @param <P_OUT> type of elements in produced by this stage
54  *
55  * @since 1.8
56  * @hide Visible for CTS testing only (OpenJDK8 tests).
57  */
58 public abstract class ReferencePipeline<P_IN, P_OUT>
59         extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
60         implements Stream<P_OUT>  {
61 
62     /**
63      * Constructor for the head of a stream pipeline.
64      *
65      * @param source {@code Supplier<Spliterator>} describing the stream source
66      * @param sourceFlags the source flags for the stream source, described in
67      *        {@link StreamOpFlag}
68      * @param parallel {@code true} if the pipeline is parallel
69      */
ReferencePipeline(Supplier<? extends Spliterator<?>> source, int sourceFlags, boolean parallel)70     ReferencePipeline(Supplier<? extends Spliterator<?>> source,
71                       int sourceFlags, boolean parallel) {
72         super(source, sourceFlags, parallel);
73     }
74 
75     /**
76      * Constructor for the head of a stream pipeline.
77      *
78      * @param source {@code Spliterator} describing the stream source
79      * @param sourceFlags The source flags for the stream source, described in
80      *        {@link StreamOpFlag}
81      * @param parallel {@code true} if the pipeline is parallel
82      */
ReferencePipeline(Spliterator<?> source, int sourceFlags, boolean parallel)83     ReferencePipeline(Spliterator<?> source,
84                       int sourceFlags, boolean parallel) {
85         super(source, sourceFlags, parallel);
86     }
87 
88     /**
89      * Constructor for appending an intermediate operation onto an existing
90      * pipeline.
91      *
92      * @param upstream the upstream element source.
93      */
ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags)94     ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
95         super(upstream, opFlags);
96     }
97 
98     // Shape-specific methods
99 
100     @Override
getOutputShape()101     public final StreamShape getOutputShape() {
102         return StreamShape.REFERENCE;
103     }
104 
105     @Override
evaluateToNode(PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<P_OUT[]> generator)106     public final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper,
107                                         Spliterator<P_IN> spliterator,
108                                         boolean flattenTree,
109                                         IntFunction<P_OUT[]> generator) {
110         return Nodes.collect(helper, spliterator, flattenTree, generator);
111     }
112 
113     @Override
wrap(PipelineHelper<P_OUT> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel)114     public final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph,
115                                      Supplier<Spliterator<P_IN>> supplier,
116                                      boolean isParallel) {
117         return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel);
118     }
119 
120     @Override
lazySpliterator(Supplier<? extends Spliterator<P_OUT>> supplier)121     public final Spliterator<P_OUT> lazySpliterator(Supplier<? extends Spliterator<P_OUT>> supplier) {
122         return new StreamSpliterators.DelegatingSpliterator<>(supplier);
123     }
124 
125     @Override
forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink)126     public final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
127         do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
128     }
129 
130     @Override
makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator)131     public final Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) {
132         return Nodes.builder(exactSizeIfKnown, generator);
133     }
134 
135 
136     // BaseStream
137 
138     @Override
iterator()139     public final Iterator<P_OUT> iterator() {
140         return Spliterators.iterator(spliterator());
141     }
142 
143 
144     // Stream
145 
146     // Stateless intermediate operations from Stream
147 
148     @Override
unordered()149     public Stream<P_OUT> unordered() {
150         if (!isOrdered())
151             return this;
152         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_ORDERED) {
153             @Override
154             public Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
155                 return sink;
156             }
157         };
158     }
159 
160     @Override
161     public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
162         Objects.requireNonNull(predicate);
163         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
164                                      StreamOpFlag.NOT_SIZED) {
165             @Override
166             public Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
167                 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
168                     @Override
169                     public void begin(long size) {
170                         downstream.begin(-1);
171                     }
172 
173                     @Override
174                     public void accept(P_OUT u) {
175                         if (predicate.test(u))
176                             downstream.accept(u);
177                     }
178                 };
179             }
180         };
181     }
182 
183     @Override
184     @SuppressWarnings("unchecked")
185     public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
186         Objects.requireNonNull(mapper);
187         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
188                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
189             @Override
190             public Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
191                 return new Sink.ChainedReference<P_OUT, R>(sink) {
192                     @Override
193                     public void accept(P_OUT u) {
194                         downstream.accept(mapper.apply(u));
195                     }
196                 };
197             }
198         };
199     }
200 
201     @Override
202     public final IntStream mapToInt(ToIntFunction<? super P_OUT> mapper) {
203         Objects.requireNonNull(mapper);
204         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
205                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
206             @Override
207             public Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
208                 return new Sink.ChainedReference<P_OUT, Integer>(sink) {
209                     @Override
210                     public void accept(P_OUT u) {
211                         downstream.accept(mapper.applyAsInt(u));
212                     }
213                 };
214             }
215         };
216     }
217 
218     @Override
219     public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) {
220         Objects.requireNonNull(mapper);
221         return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
222                                       StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
223             @Override
224             public Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
225                 return new Sink.ChainedReference<P_OUT, Long>(sink) {
226                     @Override
227                     public void accept(P_OUT u) {
228                         downstream.accept(mapper.applyAsLong(u));
229                     }
230                 };
231             }
232         };
233     }
234 
235     @Override
236     public final DoubleStream mapToDouble(ToDoubleFunction<? super P_OUT> mapper) {
237         Objects.requireNonNull(mapper);
238         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
239                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
240             @Override
241             public Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
242                 return new Sink.ChainedReference<P_OUT, Double>(sink) {
243                     @Override
244                     public void accept(P_OUT u) {
245                         downstream.accept(mapper.applyAsDouble(u));
246                     }
247                 };
248             }
249         };
250     }
251 
252     @Override
253     public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
254         Objects.requireNonNull(mapper);
255         // We can do better than this, by polling cancellationRequested when stream is infinite
256         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
257                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
258             @Override
259             public Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
260                 return new Sink.ChainedReference<P_OUT, R>(sink) {
261                     @Override
262                     public void begin(long size) {
263                         downstream.begin(-1);
264                     }
265 
266                     @Override
267                     public void accept(P_OUT u) {
268                         try (Stream<? extends R> result = mapper.apply(u)) {
269                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
270                             if (result != null)
271                                 result.sequential().forEach(downstream);
272                         }
273                     }
274                 };
275             }
276         };
277     }
278 
279     @Override
280     public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
281         Objects.requireNonNull(mapper);
282         // We can do better than this, by polling cancellationRequested when stream is infinite
283         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
284                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
285             @Override
286             public Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
287                 return new Sink.ChainedReference<P_OUT, Integer>(sink) {
288                     IntConsumer downstreamAsInt = downstream::accept;
289                     @Override
290                     public void begin(long size) {
291                         downstream.begin(-1);
292                     }
293 
294                     @Override
295                     public void accept(P_OUT u) {
296                         try (IntStream result = mapper.apply(u)) {
297                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
298                             if (result != null)
299                                 result.sequential().forEach(downstreamAsInt);
300                         }
301                     }
302                 };
303             }
304         };
305     }
306 
307     @Override
308     public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
309         Objects.requireNonNull(mapper);
310         // We can do better than this, by polling cancellationRequested when stream is infinite
311         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
312                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
313             @Override
314             public Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
315                 return new Sink.ChainedReference<P_OUT, Double>(sink) {
316                     DoubleConsumer downstreamAsDouble = downstream::accept;
317                     @Override
318                     public void begin(long size) {
319                         downstream.begin(-1);
320                     }
321 
322                     @Override
323                     public void accept(P_OUT u) {
324                         try (DoubleStream result = mapper.apply(u)) {
325                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
326                             if (result != null)
327                                 result.sequential().forEach(downstreamAsDouble);
328                         }
329                     }
330                 };
331             }
332         };
333     }
334 
335     @Override
336     public final LongStream flatMapToLong(Function<? super P_OUT, ? extends LongStream> mapper) {
337         Objects.requireNonNull(mapper);
338         // We can do better than this, by polling cancellationRequested when stream is infinite
339         return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
340                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
341             @Override
342             public Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
343                 return new Sink.ChainedReference<P_OUT, Long>(sink) {
344                     LongConsumer downstreamAsLong = downstream::accept;
345                     @Override
346                     public void begin(long size) {
347                         downstream.begin(-1);
348                     }
349 
350                     @Override
351                     public void accept(P_OUT u) {
352                         try (LongStream result = mapper.apply(u)) {
353                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
354                             if (result != null)
355                                 result.sequential().forEach(downstreamAsLong);
356                         }
357                     }
358                 };
359             }
360         };
361     }
362 
363     @Override
364     public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) {
365         Objects.requireNonNull(action);
366         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
367                                      0) {
368             @Override
369             public Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
370                 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
371                     @Override
372                     public void accept(P_OUT u) {
373                         action.accept(u);
374                         downstream.accept(u);
375                     }
376                 };
377             }
378         };
379     }
380 
381     // Stateful intermediate operations from Stream
382 
383     @Override
384     public final Stream<P_OUT> distinct() {
385         return DistinctOps.makeRef(this);
386     }
387 
388     @Override
389     public final Stream<P_OUT> sorted() {
390         return SortedOps.makeRef(this);
391     }
392 
393     @Override
394     public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) {
395         return SortedOps.makeRef(this, comparator);
396     }
397 
398     @Override
399     public final Stream<P_OUT> limit(long maxSize) {
400         if (maxSize < 0)
401             throw new IllegalArgumentException(Long.toString(maxSize));
402         return SliceOps.makeRef(this, 0, maxSize);
403     }
404 
405     @Override
406     public final Stream<P_OUT> skip(long n) {
407         if (n < 0)
408             throw new IllegalArgumentException(Long.toString(n));
409         if (n == 0)
410             return this;
411         else
412             return SliceOps.makeRef(this, n, -1);
413     }
414 
415     // Terminal operations from Stream
416 
417     @Override
418     public void forEach(Consumer<? super P_OUT> action) {
419         evaluate(ForEachOps.makeRef(action, false));
420     }
421 
422     @Override
423     public void forEachOrdered(Consumer<? super P_OUT> action) {
424         evaluate(ForEachOps.makeRef(action, true));
425     }
426 
427     @Override
428     @SuppressWarnings("unchecked")
429     public final <A> A[] toArray(IntFunction<A[]> generator) {
430         // Since A has no relation to U (not possible to declare that A is an upper bound of U)
431         // there will be no static type checking.
432         // Therefore use a raw type and assume A == U rather than propagating the separation of A and U
433         // throughout the code-base.
434         // The runtime type of U is never checked for equality with the component type of the runtime type of A[].
435         // Runtime checking will be performed when an element is stored in A[], thus if A is not a
436         // super type of U an ArrayStoreException will be thrown.
437         @SuppressWarnings("rawtypes")
438         IntFunction rawGenerator = (IntFunction) generator;
439         // TODO(b/29399275): Eclipse compiler requires explicit (Node<A[]>) cast below.
440         return (A[]) Nodes.flatten((Node<A[]>) evaluateToArrayNode(rawGenerator), rawGenerator)
441                 .asArray(rawGenerator);
442     }
443 
444     @Override
445     public final Object[] toArray() {
446         return toArray(Object[]::new);
447     }
448 
449     @Override
450     public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
451         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
452     }
453 
454     @Override
455     public final boolean allMatch(Predicate<? super P_OUT> predicate) {
456         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL));
457     }
458 
459     @Override
460     public final boolean noneMatch(Predicate<? super P_OUT> predicate) {
461         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE));
462     }
463 
464     @Override
465     public final Optional<P_OUT> findFirst() {
466         return evaluate(FindOps.makeRef(true));
467     }
468 
469     @Override
470     public final Optional<P_OUT> findAny() {
471         return evaluate(FindOps.makeRef(false));
472     }
473 
474     @Override
475     public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) {
476         return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
477     }
478 
479     @Override
480     public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
481         return evaluate(ReduceOps.makeRef(accumulator));
482     }
483 
484     @Override
485     public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) {
486         return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
487     }
488 
489     @Override
490     @SuppressWarnings("unchecked")
491     public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
492         A container;
493         if (isParallel()
494                 && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
495                 && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
496             container = collector.supplier().get();
497             BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
498             forEach(u -> accumulator.accept(container, u));
499         }
500         else {
501             container = evaluate(ReduceOps.makeRef(collector));
502         }
503         return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
504                ? (R) container
505                : collector.finisher().apply(container);
506     }
507 
508     @Override
509     public final <R> R collect(Supplier<R> supplier,
510                                BiConsumer<R, ? super P_OUT> accumulator,
511                                BiConsumer<R, R> combiner) {
512         return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
513     }
514 
515     @Override
516     public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
517         return reduce(BinaryOperator.maxBy(comparator));
518     }
519 
520     @Override
521     public final Optional<P_OUT> min(Comparator<? super P_OUT> comparator) {
522         return reduce(BinaryOperator.minBy(comparator));
523 
524     }
525 
526     @Override
527     public final long count() {
528         return mapToLong(e -> 1L).sum();
529     }
530 
531 
532     //
533 
534     /**
535      * Source stage of a ReferencePipeline.
536      *
537      * @param <E_IN> type of elements in the upstream source
538      * @param <E_OUT> type of elements in produced by this stage
539      * @since 1.8
540      * @hide Visible for CTS testing only (OpenJDK8 tests).
541      */
542     public static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
543         /**
544          * Constructor for the source stage of a Stream.
545          *
546          * @param source {@code Supplier<Spliterator>} describing the stream
547          *               source
548          * @param sourceFlags the source flags for the stream source, described
549          *                    in {@link StreamOpFlag}
550          */
551         public Head(Supplier<? extends Spliterator<?>> source,
552              int sourceFlags, boolean parallel) {
553             super(source, sourceFlags, parallel);
554         }
555 
556         /**
557          * Constructor for the source stage of a Stream.
558          *
559          * @param source {@code Spliterator} describing the stream source
560          * @param sourceFlags the source flags for the stream source, described
561          *                    in {@link StreamOpFlag}
562          */
563         public Head(Spliterator<?> source,
564              int sourceFlags, boolean parallel) {
565             super(source, sourceFlags, parallel);
566         }
567 
568         @Override
569         public final boolean opIsStateful() {
570             throw new UnsupportedOperationException();
571         }
572 
573         @Override
574         public final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {
575             throw new UnsupportedOperationException();
576         }
577 
578         // Optimized sequential terminal operations for the head of the pipeline
579 
580         @Override
581         public void forEach(Consumer<? super E_OUT> action) {
582             if (!isParallel()) {
583                 sourceStageSpliterator().forEachRemaining(action);
584             }
585             else {
586                 super.forEach(action);
587             }
588         }
589 
590         @Override
591         public void forEachOrdered(Consumer<? super E_OUT> action) {
592             if (!isParallel()) {
593                 sourceStageSpliterator().forEachRemaining(action);
594             }
595             else {
596                 super.forEachOrdered(action);
597             }
598         }
599     }
600 
601     /**
602      * Base class for a stateless intermediate stage of a Stream.
603      *
604      * @param <E_IN> type of elements in the upstream source
605      * @param <E_OUT> type of elements in produced by this stage
606      * @since 1.8
607      * @hide Visible for CTS testing only (OpenJDK8 tests).
608      */
609     public abstract static class StatelessOp<E_IN, E_OUT>
610             extends ReferencePipeline<E_IN, E_OUT> {
611         /**
612          * Construct a new Stream by appending a stateless intermediate
613          * operation to an existing stream.
614          *
615          * @param upstream The upstream pipeline stage
616          * @param inputShape The stream shape for the upstream pipeline stage
617          * @param opFlags Operation flags for the new stage
618          */
619         public StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
620                     StreamShape inputShape,
621                     int opFlags) {
622             super(upstream, opFlags);
623             assert upstream.getOutputShape() == inputShape;
624         }
625 
626         @Override
627         public final boolean opIsStateful() {
628             return false;
629         }
630     }
631 
632     /**
633      * Base class for a stateful intermediate stage of a Stream.
634      *
635      * @param <E_IN> type of elements in the upstream source
636      * @param <E_OUT> type of elements in produced by this stage
637      * @since 1.8
638      * @hide Visible for CTS testing only (OpenJDK8 tests).
639      */
640     public abstract static class StatefulOp<E_IN, E_OUT>
641             extends ReferencePipeline<E_IN, E_OUT> {
642         /**
643          * Construct a new Stream by appending a stateful intermediate operation
644          * to an existing stream.
645          * @param upstream The upstream pipeline stage
646          * @param inputShape The stream shape for the upstream pipeline stage
647          * @param opFlags Operation flags for the new stage
648          */
649         public StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
650                    StreamShape inputShape,
651                    int opFlags) {
652             super(upstream, opFlags);
653             assert upstream.getOutputShape() == inputShape;
654         }
655 
656         @Override
657         public final boolean opIsStateful() {
658             return true;
659         }
660 
661         @Override
662         public abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
663                                                        Spliterator<P_IN> spliterator,
664                                                        IntFunction<E_OUT[]> generator);
665     }
666 }
667