1 /*
2  * Copyright (c) 2012, 2014, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.IntSummaryStatistics;
28 import java.util.Objects;
29 import java.util.OptionalDouble;
30 import java.util.OptionalInt;
31 import java.util.PrimitiveIterator;
32 import java.util.Spliterator;
33 import java.util.Spliterators;
34 import java.util.function.BiConsumer;
35 import java.util.function.BinaryOperator;
36 import java.util.function.IntBinaryOperator;
37 import java.util.function.IntConsumer;
38 import java.util.function.IntFunction;
39 import java.util.function.IntPredicate;
40 import java.util.function.IntToDoubleFunction;
41 import java.util.function.IntToLongFunction;
42 import java.util.function.IntUnaryOperator;
43 import java.util.function.ObjIntConsumer;
44 import java.util.function.Supplier;
45 
46 /**
47  * Abstract base class for an intermediate pipeline stage or pipeline source
48  * stage implementing whose elements are of type {@code int}.
49  *
50  * @param <E_IN> type of elements in the upstream source
51  * @since 1.8
52  * @hide Visible for CTS testing only (OpenJDK8 tests).
53  */
54 public abstract class IntPipeline<E_IN>
55         extends AbstractPipeline<E_IN, Integer, IntStream>
56         implements IntStream {
57 
58     /**
59      * Constructor for the head of a stream pipeline.
60      *
61      * @param source {@code Supplier<Spliterator>} describing the stream source
62      * @param sourceFlags The source flags for the stream source, described in
63      *        {@link StreamOpFlag}
64      * @param parallel {@code true} if the pipeline is parallel
65      */
IntPipeline(Supplier<? extends Spliterator<Integer>> source, int sourceFlags, boolean parallel)66     IntPipeline(Supplier<? extends Spliterator<Integer>> source,
67                 int sourceFlags, boolean parallel) {
68         super(source, sourceFlags, parallel);
69     }
70 
71     /**
72      * Constructor for the head of a stream pipeline.
73      *
74      * @param source {@code Spliterator} describing the stream source
75      * @param sourceFlags The source flags for the stream source, described in
76      *        {@link StreamOpFlag}
77      * @param parallel {@code true} if the pipeline is parallel
78      */
IntPipeline(Spliterator<Integer> source, int sourceFlags, boolean parallel)79     IntPipeline(Spliterator<Integer> source,
80                 int sourceFlags, boolean parallel) {
81         super(source, sourceFlags, parallel);
82     }
83 
84     /**
85      * Constructor for appending an intermediate operation onto an existing
86      * pipeline.
87      *
88      * @param upstream the upstream element source
89      * @param opFlags the operation flags for the new operation
90      */
IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags)91     IntPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
92         super(upstream, opFlags);
93     }
94 
95     /**
96      * Adapt a {@code Sink<Integer> to an {@code IntConsumer}, ideally simply
97      * by casting.
98      */
adapt(Sink<Integer> sink)99     private static IntConsumer adapt(Sink<Integer> sink) {
100         if (sink instanceof IntConsumer) {
101             return (IntConsumer) sink;
102         }
103         else {
104             if (Tripwire.ENABLED)
105                 Tripwire.trip(AbstractPipeline.class,
106                               "using IntStream.adapt(Sink<Integer> s)");
107             return sink::accept;
108         }
109     }
110 
111     /**
112      * Adapt a {@code Spliterator<Integer>} to a {@code Spliterator.OfInt}.
113      *
114      * @implNote
115      * The implementation attempts to cast to a Spliterator.OfInt, and throws an
116      * exception if this cast is not possible.
117      */
adapt(Spliterator<Integer> s)118     private static Spliterator.OfInt adapt(Spliterator<Integer> s) {
119         if (s instanceof Spliterator.OfInt) {
120             return (Spliterator.OfInt) s;
121         }
122         else {
123             if (Tripwire.ENABLED)
124                 Tripwire.trip(AbstractPipeline.class,
125                               "using IntStream.adapt(Spliterator<Integer> s)");
126             throw new UnsupportedOperationException("IntStream.adapt(Spliterator<Integer> s)");
127         }
128     }
129 
130 
131     // Shape-specific methods
132 
133     @Override
getOutputShape()134     public final StreamShape getOutputShape() {
135         return StreamShape.INT_VALUE;
136     }
137 
138     @Override
evaluateToNode(PipelineHelper<Integer> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<Integer[]> generator)139     public final <P_IN> Node<Integer> evaluateToNode(PipelineHelper<Integer> helper,
140                                                      Spliterator<P_IN> spliterator,
141                                                      boolean flattenTree,
142                                                      IntFunction<Integer[]> generator) {
143         return Nodes.collectInt(helper, spliterator, flattenTree);
144     }
145 
146     @Override
wrap(PipelineHelper<Integer> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel)147     public final <P_IN> Spliterator<Integer> wrap(PipelineHelper<Integer> ph,
148                                                   Supplier<Spliterator<P_IN>> supplier,
149                                                   boolean isParallel) {
150         return new StreamSpliterators.IntWrappingSpliterator<>(ph, supplier, isParallel);
151     }
152 
153     @Override
154     @SuppressWarnings("unchecked")
lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier)155     public final Spliterator.OfInt lazySpliterator(Supplier<? extends Spliterator<Integer>> supplier) {
156         return new StreamSpliterators.DelegatingSpliterator.OfInt((Supplier<Spliterator.OfInt>) supplier);
157     }
158 
159     @Override
forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink)160     public final void forEachWithCancel(Spliterator<Integer> spliterator, Sink<Integer> sink) {
161         Spliterator.OfInt spl = adapt(spliterator);
162         IntConsumer adaptedSink = adapt(sink);
163         do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
164     }
165 
166     @Override
makeNodeBuilder(long exactSizeIfKnown, IntFunction<Integer[]> generator)167     public final Node.Builder<Integer> makeNodeBuilder(long exactSizeIfKnown,
168                                                        IntFunction<Integer[]> generator) {
169         return Nodes.intBuilder(exactSizeIfKnown);
170     }
171 
172 
173     // IntStream
174 
175     @Override
iterator()176     public final PrimitiveIterator.OfInt iterator() {
177         return Spliterators.iterator(spliterator());
178     }
179 
180     @Override
spliterator()181     public final Spliterator.OfInt spliterator() {
182         return adapt(super.spliterator());
183     }
184 
185     // Stateless intermediate ops from IntStream
186 
187     @Override
asLongStream()188     public final LongStream asLongStream() {
189         return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
190                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
191             @Override
192             public Sink<Integer> opWrapSink(int flags, Sink<Long> sink) {
193                 return new Sink.ChainedInt<Long>(sink) {
194                     @Override
195                     public void accept(int t) {
196                         downstream.accept((long) t);
197                     }
198                 };
199             }
200         };
201     }
202 
203     @Override
204     public final DoubleStream asDoubleStream() {
205         return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
206                                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
207             @Override
208             public Sink<Integer> opWrapSink(int flags, Sink<Double> sink) {
209                 return new Sink.ChainedInt<Double>(sink) {
210                     @Override
211                     public void accept(int t) {
212                         downstream.accept((double) t);
213                     }
214                 };
215             }
216         };
217     }
218 
219     @Override
220     public final Stream<Integer> boxed() {
221         return mapToObj(Integer::valueOf);
222     }
223 
224     @Override
225     public final IntStream map(IntUnaryOperator mapper) {
226         Objects.requireNonNull(mapper);
227         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
228                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
229             @Override
230             public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
231                 return new Sink.ChainedInt<Integer>(sink) {
232                     @Override
233                     public void accept(int t) {
234                         downstream.accept(mapper.applyAsInt(t));
235                     }
236                 };
237             }
238         };
239     }
240 
241     @Override
242     public final <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) {
243         Objects.requireNonNull(mapper);
244         return new ReferencePipeline.StatelessOp<Integer, U>(this, StreamShape.INT_VALUE,
245                                                              StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
246             @Override
247             public Sink<Integer> opWrapSink(int flags, Sink<U> sink) {
248                 return new Sink.ChainedInt<U>(sink) {
249                     @Override
250                     public void accept(int t) {
251                         downstream.accept(mapper.apply(t));
252                     }
253                 };
254             }
255         };
256     }
257 
258     @Override
259     public final LongStream mapToLong(IntToLongFunction mapper) {
260         Objects.requireNonNull(mapper);
261         return new LongPipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
262                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
263             @Override
264             public Sink<Integer> opWrapSink(int flags, Sink<Long> sink) {
265                 return new Sink.ChainedInt<Long>(sink) {
266                     @Override
267                     public void accept(int t) {
268                         downstream.accept(mapper.applyAsLong(t));
269                     }
270                 };
271             }
272         };
273     }
274 
275     @Override
276     public final DoubleStream mapToDouble(IntToDoubleFunction mapper) {
277         Objects.requireNonNull(mapper);
278         return new DoublePipeline.StatelessOp<Integer>(this, StreamShape.INT_VALUE,
279                                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
280             @Override
281             public Sink<Integer> opWrapSink(int flags, Sink<Double> sink) {
282                 return new Sink.ChainedInt<Double>(sink) {
283                     @Override
284                     public void accept(int t) {
285                         downstream.accept(mapper.applyAsDouble(t));
286                     }
287                 };
288             }
289         };
290     }
291 
292     @Override
293     public final IntStream flatMap(IntFunction<? extends IntStream> mapper) {
294         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
295                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
296             @Override
297             public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
298                 return new Sink.ChainedInt<Integer>(sink) {
299                     @Override
300                     public void begin(long size) {
301                         downstream.begin(-1);
302                     }
303 
304                     @Override
305                     public void accept(int t) {
306                         try (IntStream result = mapper.apply(t)) {
307                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
308                             if (result != null)
309                                 result.sequential().forEach(i -> downstream.accept(i));
310                         }
311                     }
312                 };
313             }
314         };
315     }
316 
317     @Override
318     public IntStream unordered() {
319         if (!isOrdered())
320             return this;
321         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE, StreamOpFlag.NOT_ORDERED) {
322             @Override
323             public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
324                 return sink;
325             }
326         };
327     }
328 
329     @Override
330     public final IntStream filter(IntPredicate predicate) {
331         Objects.requireNonNull(predicate);
332         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
333                                         StreamOpFlag.NOT_SIZED) {
334             @Override
335             public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
336                 return new Sink.ChainedInt<Integer>(sink) {
337                     @Override
338                     public void begin(long size) {
339                         downstream.begin(-1);
340                     }
341 
342                     @Override
343                     public void accept(int t) {
344                         if (predicate.test(t))
345                             downstream.accept(t);
346                     }
347                 };
348             }
349         };
350     }
351 
352     @Override
353     public final IntStream peek(IntConsumer action) {
354         Objects.requireNonNull(action);
355         return new StatelessOp<Integer>(this, StreamShape.INT_VALUE,
356                                         0) {
357             @Override
358             public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
359                 return new Sink.ChainedInt<Integer>(sink) {
360                     @Override
361                     public void accept(int t) {
362                         action.accept(t);
363                         downstream.accept(t);
364                     }
365                 };
366             }
367         };
368     }
369 
370     // Stateful intermediate ops from IntStream
371 
372     @Override
373     public final IntStream limit(long maxSize) {
374         if (maxSize < 0)
375             throw new IllegalArgumentException(Long.toString(maxSize));
376         return SliceOps.makeInt(this, 0, maxSize);
377     }
378 
379     @Override
380     public final IntStream skip(long n) {
381         if (n < 0)
382             throw new IllegalArgumentException(Long.toString(n));
383         if (n == 0)
384             return this;
385         else
386             return SliceOps.makeInt(this, n, -1);
387     }
388 
389     @Override
390     public final IntStream sorted() {
391         return SortedOps.makeInt(this);
392     }
393 
394     @Override
395     public final IntStream distinct() {
396         // While functional and quick to implement, this approach is not very efficient.
397         // An efficient version requires an int-specific map/set implementation.
398         return boxed().distinct().mapToInt(i -> i);
399     }
400 
401     // Terminal ops from IntStream
402 
403     @Override
404     public void forEach(IntConsumer action) {
405         evaluate(ForEachOps.makeInt(action, false));
406     }
407 
408     @Override
409     public void forEachOrdered(IntConsumer action) {
410         evaluate(ForEachOps.makeInt(action, true));
411     }
412 
413     @Override
414     public final int sum() {
415         return reduce(0, Integer::sum);
416     }
417 
418     @Override
419     public final OptionalInt min() {
420         return reduce(Math::min);
421     }
422 
423     @Override
424     public final OptionalInt max() {
425         return reduce(Math::max);
426     }
427 
428     @Override
429     public final long count() {
430         return mapToLong(e -> 1L).sum();
431     }
432 
433     @Override
434     public final OptionalDouble average() {
435         long[] avg = collect(() -> new long[2],
436                              (ll, i) -> {
437                                  ll[0]++;
438                                  ll[1] += i;
439                              },
440                              (ll, rr) -> {
441                                  ll[0] += rr[0];
442                                  ll[1] += rr[1];
443                              });
444         return avg[0] > 0
445                ? OptionalDouble.of((double) avg[1] / avg[0])
446                : OptionalDouble.empty();
447     }
448 
449     @Override
450     public final IntSummaryStatistics summaryStatistics() {
451         return collect(IntSummaryStatistics::new, IntSummaryStatistics::accept,
452                        IntSummaryStatistics::combine);
453     }
454 
455     @Override
456     public final int reduce(int identity, IntBinaryOperator op) {
457         return evaluate(ReduceOps.makeInt(identity, op));
458     }
459 
460     @Override
461     public final OptionalInt reduce(IntBinaryOperator op) {
462         return evaluate(ReduceOps.makeInt(op));
463     }
464 
465     @Override
466     public final <R> R collect(Supplier<R> supplier,
467                                ObjIntConsumer<R> accumulator,
468                                BiConsumer<R, R> combiner) {
469         BinaryOperator<R> operator = (left, right) -> {
470             combiner.accept(left, right);
471             return left;
472         };
473         return evaluate(ReduceOps.makeInt(supplier, accumulator, operator));
474     }
475 
476     @Override
477     public final boolean anyMatch(IntPredicate predicate) {
478         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ANY));
479     }
480 
481     @Override
482     public final boolean allMatch(IntPredicate predicate) {
483         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.ALL));
484     }
485 
486     @Override
487     public final boolean noneMatch(IntPredicate predicate) {
488         return evaluate(MatchOps.makeInt(predicate, MatchOps.MatchKind.NONE));
489     }
490 
491     @Override
492     public final OptionalInt findFirst() {
493         return evaluate(FindOps.makeInt(true));
494     }
495 
496     @Override
497     public final OptionalInt findAny() {
498         return evaluate(FindOps.makeInt(false));
499     }
500 
501     @Override
502     public final int[] toArray() {
503         return Nodes.flattenInt((Node.OfInt) evaluateToArrayNode(Integer[]::new))
504                         .asPrimitiveArray();
505     }
506 
507     //
508 
509     /**
510      * Source stage of an IntStream.
511      *
512      * @param <E_IN> type of elements in the upstream source
513      * @since 1.8
514      * @hide Visible for CTS testing only (OpenJDK8 tests).
515      */
516     public static class Head<E_IN> extends IntPipeline<E_IN> {
517         /**
518          * Constructor for the source stage of an IntStream.
519          *
520          * @param source {@code Supplier<Spliterator>} describing the stream
521          *               source
522          * @param sourceFlags the source flags for the stream source, described
523          *                    in {@link StreamOpFlag}
524          * @param parallel {@code true} if the pipeline is parallel
525          */
526         public Head(Supplier<? extends Spliterator<Integer>> source,
527              int sourceFlags, boolean parallel) {
528             super(source, sourceFlags, parallel);
529         }
530 
531         /**
532          * Constructor for the source stage of an IntStream.
533          *
534          * @param source {@code Spliterator} describing the stream source
535          * @param sourceFlags the source flags for the stream source, described
536          *                    in {@link StreamOpFlag}
537          * @param parallel {@code true} if the pipeline is parallel
538          */
539         public Head(Spliterator<Integer> source,
540              int sourceFlags, boolean parallel) {
541             super(source, sourceFlags, parallel);
542         }
543 
544         @Override
545         public final boolean opIsStateful() {
546             throw new UnsupportedOperationException();
547         }
548 
549         @Override
550         public final Sink<E_IN> opWrapSink(int flags, Sink<Integer> sink) {
551             throw new UnsupportedOperationException();
552         }
553 
554         // Optimized sequential terminal operations for the head of the pipeline
555 
556         @Override
557         public void forEach(IntConsumer action) {
558             if (!isParallel()) {
559                 adapt(sourceStageSpliterator()).forEachRemaining(action);
560             }
561             else {
562                 super.forEach(action);
563             }
564         }
565 
566         @Override
567         public void forEachOrdered(IntConsumer action) {
568             if (!isParallel()) {
569                 adapt(sourceStageSpliterator()).forEachRemaining(action);
570             }
571             else {
572                 super.forEachOrdered(action);
573             }
574         }
575     }
576 
577     /**
578      * Base class for a stateless intermediate stage of an IntStream
579      *
580      * @param <E_IN> type of elements in the upstream source
581      * @since 1.8
582      * @hide Visible for CTS testing only (OpenJDK8 tests).
583      */
584     public abstract static class StatelessOp<E_IN> extends IntPipeline<E_IN> {
585         /**
586          * Construct a new IntStream by appending a stateless intermediate
587          * operation to an existing stream.
588          * @param upstream The upstream pipeline stage
589          * @param inputShape The stream shape for the upstream pipeline stage
590          * @param opFlags Operation flags for the new stage
591          */
592         public StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
593                     StreamShape inputShape,
594                     int opFlags) {
595             super(upstream, opFlags);
596             assert upstream.getOutputShape() == inputShape;
597         }
598 
599         @Override
600         public final boolean opIsStateful() {
601             return false;
602         }
603     }
604 
605     /**
606      * Base class for a stateful intermediate stage of an IntStream.
607      *
608      * @param <E_IN> type of elements in the upstream source
609      * @since 1.8
610      * @hide Visible for CTS testing only (OpenJDK8 tests).
611      */
612     public abstract static class StatefulOp<E_IN> extends IntPipeline<E_IN> {
613         /**
614          * Construct a new IntStream by appending a stateful intermediate
615          * operation to an existing stream.
616          * @param upstream The upstream pipeline stage
617          * @param inputShape The stream shape for the upstream pipeline stage
618          * @param opFlags Operation flags for the new stage
619          */
620         public StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
621                    StreamShape inputShape,
622                    int opFlags) {
623             super(upstream, opFlags);
624             assert upstream.getOutputShape() == inputShape;
625         }
626 
627         @Override
628         public final boolean opIsStateful() {
629             return true;
630         }
631 
632         @Override
633         public abstract <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
634                                                          Spliterator<P_IN> spliterator,
635                                                          IntFunction<Integer[]> generator);
636     }
637 }
638