1 /*
2  * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.LongSummaryStatistics;
28 import java.util.Objects;
29 import java.util.OptionalDouble;
30 import java.util.OptionalLong;
31 import java.util.PrimitiveIterator;
32 import java.util.Spliterator;
33 import java.util.Spliterators;
34 import java.util.function.BiConsumer;
35 import java.util.function.BinaryOperator;
36 import java.util.function.IntFunction;
37 import java.util.function.LongBinaryOperator;
38 import java.util.function.LongConsumer;
39 import java.util.function.LongFunction;
40 import java.util.function.LongPredicate;
41 import java.util.function.LongToDoubleFunction;
42 import java.util.function.LongToIntFunction;
43 import java.util.function.LongUnaryOperator;
44 import java.util.function.ObjLongConsumer;
45 import java.util.function.Supplier;
46 
47 /**
48  * Abstract base class for an intermediate pipeline stage or pipeline source
49  * stage implementing whose elements are of type {@code long}.
50  *
51  * @param <E_IN> type of elements in the upstream source
52  * @since 1.8
53  * @hide Visible for CTS testing only (OpenJDK8 tests).
54  */
55 public abstract class LongPipeline<E_IN>
56         extends AbstractPipeline<E_IN, Long, LongStream>
57         implements LongStream {
58 
59     /**
60      * Constructor for the head of a stream pipeline.
61      *
62      * @param source {@code Supplier<Spliterator>} describing the stream source
63      * @param sourceFlags the source flags for the stream source, described in
64      *        {@link StreamOpFlag}
65      * @param parallel {@code true} if the pipeline is parallel
66      */
LongPipeline(Supplier<? extends Spliterator<Long>> source, int sourceFlags, boolean parallel)67     LongPipeline(Supplier<? extends Spliterator<Long>> source,
68                  int sourceFlags, boolean parallel) {
69         super(source, sourceFlags, parallel);
70     }
71 
72     /**
73      * Constructor for the head of a stream pipeline.
74      *
75      * @param source {@code Spliterator} describing the stream source
76      * @param sourceFlags the source flags for the stream source, described in
77      *        {@link StreamOpFlag}
78      * @param parallel {@code true} if the pipeline is parallel
79      */
LongPipeline(Spliterator<Long> source, int sourceFlags, boolean parallel)80     LongPipeline(Spliterator<Long> source,
81                  int sourceFlags, boolean parallel) {
82         super(source, sourceFlags, parallel);
83     }
84 
85     /**
86      * Constructor for appending an intermediate operation onto an existing pipeline.
87      *
88      * @param upstream the upstream element source.
89      * @param opFlags the operation flags
90      */
LongPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags)91     LongPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
92         super(upstream, opFlags);
93     }
94 
95     /**
96      * Adapt a {@code Sink<Long> to an {@code LongConsumer}, ideally simply
97      * by casting.
98      */
adapt(Sink<Long> sink)99     private static LongConsumer adapt(Sink<Long> sink) {
100         if (sink instanceof LongConsumer) {
101             return (LongConsumer) sink;
102         } else {
103             if (Tripwire.ENABLED)
104                 Tripwire.trip(AbstractPipeline.class,
105                               "using LongStream.adapt(Sink<Long> s)");
106             return sink::accept;
107         }
108     }
109 
110     /**
111      * Adapt a {@code Spliterator<Long>} to a {@code Spliterator.OfLong}.
112      *
113      * @implNote
114      * The implementation attempts to cast to a Spliterator.OfLong, and throws
115      * an exception if this cast is not possible.
116      */
adapt(Spliterator<Long> s)117     private static Spliterator.OfLong adapt(Spliterator<Long> s) {
118         if (s instanceof Spliterator.OfLong) {
119             return (Spliterator.OfLong) s;
120         } else {
121             if (Tripwire.ENABLED)
122                 Tripwire.trip(AbstractPipeline.class,
123                               "using LongStream.adapt(Spliterator<Long> s)");
124             throw new UnsupportedOperationException("LongStream.adapt(Spliterator<Long> s)");
125         }
126     }
127 
128 
129     // Shape-specific methods
130 
131     @Override
getOutputShape()132     public final StreamShape getOutputShape() {
133         return StreamShape.LONG_VALUE;
134     }
135 
136     @Override
evaluateToNode(PipelineHelper<Long> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<Long[]> generator)137     public final <P_IN> Node<Long> evaluateToNode(PipelineHelper<Long> helper,
138                                            Spliterator<P_IN> spliterator,
139                                            boolean flattenTree,
140                                            IntFunction<Long[]> generator) {
141         return Nodes.collectLong(helper, spliterator, flattenTree);
142     }
143 
144     @Override
wrap(PipelineHelper<Long> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel)145     public final <P_IN> Spliterator<Long> wrap(PipelineHelper<Long> ph,
146                                         Supplier<Spliterator<P_IN>> supplier,
147                                         boolean isParallel) {
148         return new StreamSpliterators.LongWrappingSpliterator<>(ph, supplier, isParallel);
149     }
150 
151     @Override
152     @SuppressWarnings("unchecked")
lazySpliterator(Supplier<? extends Spliterator<Long>> supplier)153     public final Spliterator.OfLong lazySpliterator(Supplier<? extends Spliterator<Long>> supplier) {
154         return new StreamSpliterators.DelegatingSpliterator.OfLong((Supplier<Spliterator.OfLong>) supplier);
155     }
156 
157     @Override
forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink)158     public final void forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) {
159         Spliterator.OfLong spl = adapt(spliterator);
160         LongConsumer adaptedSink =  adapt(sink);
161         do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
162     }
163 
164     @Override
makeNodeBuilder(long exactSizeIfKnown, IntFunction<Long[]> generator)165     public final Node.Builder<Long> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Long[]> generator) {
166         return Nodes.longBuilder(exactSizeIfKnown);
167     }
168 
169 
170     // LongStream
171 
172     @Override
iterator()173     public final PrimitiveIterator.OfLong iterator() {
174         return Spliterators.iterator(spliterator());
175     }
176 
177     @Override
spliterator()178     public final Spliterator.OfLong spliterator() {
179         return adapt(super.spliterator());
180     }
181 
182     // Stateless intermediate ops from LongStream
183 
184     @Override
asDoubleStream()185     public final DoubleStream asDoubleStream() {
186         return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
187                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
188             @Override
189             public Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
190                 return new Sink.ChainedLong<Double>(sink) {
191                     @Override
192                     public void accept(long t) {
193                         downstream.accept((double) t);
194                     }
195                 };
196             }
197         };
198     }
199 
200     @Override
201     public final Stream<Long> boxed() {
202         return mapToObj(Long::valueOf);
203     }
204 
205     @Override
206     public final LongStream map(LongUnaryOperator mapper) {
207         Objects.requireNonNull(mapper);
208         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
209                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
210             @Override
211             public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
212                 return new Sink.ChainedLong<Long>(sink) {
213                     @Override
214                     public void accept(long t) {
215                         downstream.accept(mapper.applyAsLong(t));
216                     }
217                 };
218             }
219         };
220     }
221 
222     @Override
223     public final <U> Stream<U> mapToObj(LongFunction<? extends U> mapper) {
224         Objects.requireNonNull(mapper);
225         return new ReferencePipeline.StatelessOp<Long, U>(this, StreamShape.LONG_VALUE,
226                                                           StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
227             @Override
228             public Sink<Long> opWrapSink(int flags, Sink<U> sink) {
229                 return new Sink.ChainedLong<U>(sink) {
230                     @Override
231                     public void accept(long t) {
232                         downstream.accept(mapper.apply(t));
233                     }
234                 };
235             }
236         };
237     }
238 
239     @Override
240     public final IntStream mapToInt(LongToIntFunction mapper) {
241         Objects.requireNonNull(mapper);
242         return new IntPipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
243                                                  StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
244             @Override
245             public Sink<Long> opWrapSink(int flags, Sink<Integer> sink) {
246                 return new Sink.ChainedLong<Integer>(sink) {
247                     @Override
248                     public void accept(long t) {
249                         downstream.accept(mapper.applyAsInt(t));
250                     }
251                 };
252             }
253         };
254     }
255 
256     @Override
257     public final DoubleStream mapToDouble(LongToDoubleFunction mapper) {
258         Objects.requireNonNull(mapper);
259         return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
260                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
261             @Override
262             public Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
263                 return new Sink.ChainedLong<Double>(sink) {
264                     @Override
265                     public void accept(long t) {
266                         downstream.accept(mapper.applyAsDouble(t));
267                     }
268                 };
269             }
270         };
271     }
272 
273     @Override
274     public final LongStream flatMap(LongFunction<? extends LongStream> mapper) {
275         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
276                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
277             @Override
278             public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
279                 return new Sink.ChainedLong<Long>(sink) {
280                     @Override
281                     public void begin(long size) {
282                         downstream.begin(-1);
283                     }
284 
285                     @Override
286                     public void accept(long t) {
287                         try (LongStream result = mapper.apply(t)) {
288                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
289                             if (result != null)
290                                 result.sequential().forEach(i -> downstream.accept(i));
291                         }
292                     }
293                 };
294             }
295         };
296     }
297 
298     @Override
299     public LongStream unordered() {
300         if (!isOrdered())
301             return this;
302         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE, StreamOpFlag.NOT_ORDERED) {
303             @Override
304             public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
305                 return sink;
306             }
307         };
308     }
309 
310     @Override
311     public final LongStream filter(LongPredicate predicate) {
312         Objects.requireNonNull(predicate);
313         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
314                                      StreamOpFlag.NOT_SIZED) {
315             @Override
316             public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
317                 return new Sink.ChainedLong<Long>(sink) {
318                     @Override
319                     public void begin(long size) {
320                         downstream.begin(-1);
321                     }
322 
323                     @Override
324                     public void accept(long t) {
325                         if (predicate.test(t))
326                             downstream.accept(t);
327                     }
328                 };
329             }
330         };
331     }
332 
333     @Override
334     public final LongStream peek(LongConsumer action) {
335         Objects.requireNonNull(action);
336         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
337                                      0) {
338             @Override
339             public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
340                 return new Sink.ChainedLong<Long>(sink) {
341                     @Override
342                     public void accept(long t) {
343                         action.accept(t);
344                         downstream.accept(t);
345                     }
346                 };
347             }
348         };
349     }
350 
351     // Stateful intermediate ops from LongStream
352 
353     @Override
354     public final LongStream limit(long maxSize) {
355         if (maxSize < 0)
356             throw new IllegalArgumentException(Long.toString(maxSize));
357         return SliceOps.makeLong(this, 0, maxSize);
358     }
359 
360     @Override
361     public final LongStream skip(long n) {
362         if (n < 0)
363             throw new IllegalArgumentException(Long.toString(n));
364         if (n == 0)
365             return this;
366         else
367             return SliceOps.makeLong(this, n, -1);
368     }
369 
370     @Override
371     public final LongStream sorted() {
372         return SortedOps.makeLong(this);
373     }
374 
375     @Override
376     public final LongStream distinct() {
377         // While functional and quick to implement, this approach is not very efficient.
378         // An efficient version requires a long-specific map/set implementation.
379         return boxed().distinct().mapToLong(i -> (long) i);
380     }
381 
382     // Terminal ops from LongStream
383 
384     @Override
385     public void forEach(LongConsumer action) {
386         evaluate(ForEachOps.makeLong(action, false));
387     }
388 
389     @Override
390     public void forEachOrdered(LongConsumer action) {
391         evaluate(ForEachOps.makeLong(action, true));
392     }
393 
394     @Override
395     public final long sum() {
396         // use better algorithm to compensate for intermediate overflow?
397         return reduce(0, Long::sum);
398     }
399 
400     @Override
401     public final OptionalLong min() {
402         return reduce(Math::min);
403     }
404 
405     @Override
406     public final OptionalLong max() {
407         return reduce(Math::max);
408     }
409 
410     @Override
411     public final OptionalDouble average() {
412         long[] avg = collect(() -> new long[2],
413                              (ll, i) -> {
414                                  ll[0]++;
415                                  ll[1] += i;
416                              },
417                              (ll, rr) -> {
418                                  ll[0] += rr[0];
419                                  ll[1] += rr[1];
420                              });
421         return avg[0] > 0
422                ? OptionalDouble.of((double) avg[1] / avg[0])
423                : OptionalDouble.empty();
424     }
425 
426     @Override
427     public final long count() {
428         return map(e -> 1L).sum();
429     }
430 
431     @Override
432     public final LongSummaryStatistics summaryStatistics() {
433         return collect(LongSummaryStatistics::new, LongSummaryStatistics::accept,
434                        LongSummaryStatistics::combine);
435     }
436 
437     @Override
438     public final long reduce(long identity, LongBinaryOperator op) {
439         return evaluate(ReduceOps.makeLong(identity, op));
440     }
441 
442     @Override
443     public final OptionalLong reduce(LongBinaryOperator op) {
444         return evaluate(ReduceOps.makeLong(op));
445     }
446 
447     @Override
448     public final <R> R collect(Supplier<R> supplier,
449                                ObjLongConsumer<R> accumulator,
450                                BiConsumer<R, R> combiner) {
451         BinaryOperator<R> operator = (left, right) -> {
452             combiner.accept(left, right);
453             return left;
454         };
455         return evaluate(ReduceOps.makeLong(supplier, accumulator, operator));
456     }
457 
458     @Override
459     public final boolean anyMatch(LongPredicate predicate) {
460         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ANY));
461     }
462 
463     @Override
464     public final boolean allMatch(LongPredicate predicate) {
465         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ALL));
466     }
467 
468     @Override
469     public final boolean noneMatch(LongPredicate predicate) {
470         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.NONE));
471     }
472 
473     @Override
474     public final OptionalLong findFirst() {
475         return evaluate(FindOps.makeLong(true));
476     }
477 
478     @Override
479     public final OptionalLong findAny() {
480         return evaluate(FindOps.makeLong(false));
481     }
482 
483     @Override
484     public final long[] toArray() {
485         return Nodes.flattenLong((Node.OfLong) evaluateToArrayNode(Long[]::new))
486                 .asPrimitiveArray();
487     }
488 
489 
490     //
491 
492     /**
493      * Source stage of a LongPipeline.
494      *
495      * @param <E_IN> type of elements in the upstream source
496      * @since 1.8
497      * @hide Visibility for CTS only (OpenJDK 8 streams tests).
498      */
499     public static class Head<E_IN> extends LongPipeline<E_IN> {
500         /**
501          * Constructor for the source stage of a LongStream.
502          *
503          * @param source {@code Supplier<Spliterator>} describing the stream
504          *               source
505          * @param sourceFlags the source flags for the stream source, described
506          *                    in {@link StreamOpFlag}
507          * @param parallel {@code true} if the pipeline is parallel
508          */
509         public Head(Supplier<? extends Spliterator<Long>> source,
510              int sourceFlags, boolean parallel) {
511             super(source, sourceFlags, parallel);
512         }
513 
514         /**
515          * Constructor for the source stage of a LongStream.
516          *
517          * @param source {@code Spliterator} describing the stream source
518          * @param sourceFlags the source flags for the stream source, described
519          *                    in {@link StreamOpFlag}
520          * @param parallel {@code true} if the pipeline is parallel
521          */
522         public Head(Spliterator<Long> source,
523              int sourceFlags, boolean parallel) {
524             super(source, sourceFlags, parallel);
525         }
526 
527         @Override
528         public final boolean opIsStateful() {
529             throw new UnsupportedOperationException();
530         }
531 
532         @Override
533         public final Sink<E_IN> opWrapSink(int flags, Sink<Long> sink) {
534             throw new UnsupportedOperationException();
535         }
536 
537         // Optimized sequential terminal operations for the head of the pipeline
538 
539         @Override
540         public void forEach(LongConsumer action) {
541             if (!isParallel()) {
542                 adapt(sourceStageSpliterator()).forEachRemaining(action);
543             } else {
544                 super.forEach(action);
545             }
546         }
547 
548         @Override
549         public void forEachOrdered(LongConsumer action) {
550             if (!isParallel()) {
551                 adapt(sourceStageSpliterator()).forEachRemaining(action);
552             } else {
553                 super.forEachOrdered(action);
554             }
555         }
556     }
557 
558     /** Base class for a stateless intermediate stage of a LongStream.
559      *
560      * @param <E_IN> type of elements in the upstream source
561      * @since 1.8
562      * @hide Visible for CTS testing only (OpenJDK8 tests).
563      */
564     public abstract static class StatelessOp<E_IN> extends LongPipeline<E_IN> {
565         /**
566          * Construct a new LongStream by appending a stateless intermediate
567          * operation to an existing stream.
568          * @param upstream The upstream pipeline stage
569          * @param inputShape The stream shape for the upstream pipeline stage
570          * @param opFlags Operation flags for the new stage
571          */
572         public StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
573                     StreamShape inputShape,
574                     int opFlags) {
575             super(upstream, opFlags);
576             assert upstream.getOutputShape() == inputShape;
577         }
578 
579         @Override
580         public final boolean opIsStateful() {
581             return false;
582         }
583     }
584 
585     /**
586      * Base class for a stateful intermediate stage of a LongStream.
587      *
588      * @param <E_IN> type of elements in the upstream source
589      * @since 1.8
590      * @hide Visible for CTS testing only (OpenJDK8 tests).
591      */
592     public abstract static class StatefulOp<E_IN> extends LongPipeline<E_IN> {
593         /**
594          * Construct a new LongStream by appending a stateful intermediate
595          * operation to an existing stream.
596          * @param upstream The upstream pipeline stage
597          * @param inputShape The stream shape for the upstream pipeline stage
598          * @param opFlags Operation flags for the new stage
599          * @hide Visible for CTS testing only (OpenJDK8 tests).
600          */
601         public StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
602                    StreamShape inputShape,
603                    int opFlags) {
604             super(upstream, opFlags);
605             assert upstream.getOutputShape() == inputShape;
606         }
607 
608         @Override
609         public final boolean opIsStateful() {
610             return true;
611         }
612 
613         @Override
614         public abstract <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
615                                                       Spliterator<P_IN> spliterator,
616                                                       IntFunction<Long[]> generator);
617     }
618 }
619