1 /*
2  * Copyright (c) 2012, 2021, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.Objects;
28 import java.util.Optional;
29 import java.util.OptionalDouble;
30 import java.util.OptionalInt;
31 import java.util.OptionalLong;
32 import java.util.Spliterator;
33 import java.util.concurrent.CountedCompleter;
34 import java.util.function.BiConsumer;
35 import java.util.function.BiFunction;
36 import java.util.function.BinaryOperator;
37 import java.util.function.DoubleBinaryOperator;
38 import java.util.function.IntBinaryOperator;
39 import java.util.function.LongBinaryOperator;
40 import java.util.function.ObjDoubleConsumer;
41 import java.util.function.ObjIntConsumer;
42 import java.util.function.ObjLongConsumer;
43 import java.util.function.Supplier;
44 
45 /**
46  * Factory for creating instances of {@code TerminalOp} that implement
47  * reductions.
48  *
49  * @since 1.8
50  */
51 final class ReduceOps {
52 
ReduceOps()53     private ReduceOps() { }
54 
55     /**
56      * Constructs a {@code TerminalOp} that implements a functional reduce on
57      * reference values.
58      *
59      * @param <T> the type of the input elements
60      * @param <U> the type of the result
61      * @param seed the identity element for the reduction
62      * @param reducer the accumulating function that incorporates an additional
63      *        input element into the result
64      * @param combiner the combining function that combines two intermediate
65      *        results
66      * @return a {@code TerminalOp} implementing the reduction
67      */
68     public static <T, U> TerminalOp<T, U>
makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner)69     makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
70         Objects.requireNonNull(reducer);
71         Objects.requireNonNull(combiner);
72         class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
73             @Override
74             public void begin(long size) {
75                 state = seed;
76             }
77 
78             @Override
79             public void accept(T t) {
80                 state = reducer.apply(state, t);
81             }
82 
83             @Override
84             public void combine(ReducingSink other) {
85                 state = combiner.apply(state, other.state);
86             }
87         }
88         return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
89             @Override
90             public ReducingSink makeSink() {
91                 return new ReducingSink();
92             }
93         };
94     }
95 
96     /**
97      * Constructs a {@code TerminalOp} that implements a functional reduce on
98      * reference values producing an optional reference result.
99      *
100      * @param <T> The type of the input elements, and the type of the result
101      * @param operator The reducing function
102      * @return A {@code TerminalOp} implementing the reduction
103      */
104     public static <T> TerminalOp<T, Optional<T>>
105     makeRef(BinaryOperator<T> operator) {
106         Objects.requireNonNull(operator);
107         class ReducingSink
108                 implements AccumulatingSink<T, Optional<T>, ReducingSink> {
109             private boolean empty;
110             private T state;
111 
112             public void begin(long size) {
113                 empty = true;
114                 state = null;
115             }
116 
117             @Override
118             public void accept(T t) {
119                 if (empty) {
120                     empty = false;
121                     state = t;
122                 } else {
123                     state = operator.apply(state, t);
124                 }
125             }
126 
127             @Override
128             public Optional<T> get() {
129                 return empty ? Optional.empty() : Optional.of(state);
130             }
131 
132             @Override
133             public void combine(ReducingSink other) {
134                 if (!other.empty)
135                     accept(other.state);
136             }
137         }
138         return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) {
139             @Override
140             public ReducingSink makeSink() {
141                 return new ReducingSink();
142             }
143         };
144     }
145 
146     /**
147      * Constructs a {@code TerminalOp} that implements a mutable reduce on
148      * reference values.
149      *
150      * @param <T> the type of the input elements
151      * @param <I> the type of the intermediate reduction result
152      * @param collector a {@code Collector} defining the reduction
153      * @return a {@code ReduceOp} implementing the reduction
154      */
155     public static <T, I> TerminalOp<T, I>
156     makeRef(Collector<? super T, I, ?> collector) {
157         Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
158         BiConsumer<I, ? super T> accumulator = collector.accumulator();
159         BinaryOperator<I> combiner = collector.combiner();
160         class ReducingSink extends Box<I>
161                 implements AccumulatingSink<T, I, ReducingSink> {
162             @Override
163             public void begin(long size) {
164                 state = supplier.get();
165             }
166 
167             @Override
168             public void accept(T t) {
169                 accumulator.accept(state, t);
170             }
171 
172             @Override
173             public void combine(ReducingSink other) {
174                 state = combiner.apply(state, other.state);
175             }
176         }
177         return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
178             @Override
179             public ReducingSink makeSink() {
180                 return new ReducingSink();
181             }
182 
183             @Override
184             public int getOpFlags() {
185                 return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
186                        ? StreamOpFlag.NOT_ORDERED
187                        : 0;
188             }
189         };
190     }
191 
192     /**
193      * Constructs a {@code TerminalOp} that implements a mutable reduce on
194      * reference values.
195      *
196      * @param <T> the type of the input elements
197      * @param <R> the type of the result
198      * @param seedFactory a factory to produce a new base accumulator
199      * @param accumulator a function to incorporate an element into an
200      *        accumulator
201      * @param reducer a function to combine an accumulator into another
202      * @return a {@code TerminalOp} implementing the reduction
203      */
204     public static <T, R> TerminalOp<T, R>
205     makeRef(Supplier<R> seedFactory,
206             BiConsumer<R, ? super T> accumulator,
207             BiConsumer<R,R> reducer) {
208         Objects.requireNonNull(seedFactory);
209         Objects.requireNonNull(accumulator);
210         Objects.requireNonNull(reducer);
211         class ReducingSink extends Box<R>
212                 implements AccumulatingSink<T, R, ReducingSink> {
213             @Override
214             public void begin(long size) {
215                 state = seedFactory.get();
216             }
217 
218             @Override
219             public void accept(T t) {
220                 accumulator.accept(state, t);
221             }
222 
223             @Override
224             public void combine(ReducingSink other) {
225                 reducer.accept(state, other.state);
226             }
227         }
228         return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) {
229             @Override
230             public ReducingSink makeSink() {
231                 return new ReducingSink();
232             }
233         };
234     }
235 
236     /**
237      * Constructs a {@code TerminalOp} that counts the number of stream
238      * elements.  If the size of the pipeline is known then count is the size
239      * and there is no need to evaluate the pipeline.  If the size of the
240      * pipeline is non known then count is produced, via reduction, using a
241      * {@link CountingSink}.
242      *
243      * @param <T> the type of the input elements
244      * @return a {@code TerminalOp} implementing the counting
245      */
246     public static <T> TerminalOp<T, Long>
247     makeRefCounting() {
248         return new ReduceOp<T, Long, CountingSink<T>>(StreamShape.REFERENCE) {
249             @Override
250             public CountingSink<T> makeSink() { return new CountingSink.OfRef<>(); }
251 
252             @Override
253             public <P_IN> Long evaluateSequential(PipelineHelper<T> helper,
254                                                   Spliterator<P_IN> spliterator) {
255                 long size = helper.exactOutputSizeIfKnown(spliterator);
256                 if (size != -1)
257                     return size;
258                 return super.evaluateSequential(helper, spliterator);
259             }
260 
261             @Override
262             public <P_IN> Long evaluateParallel(PipelineHelper<T> helper,
263                                                 Spliterator<P_IN> spliterator) {
264                 long size = helper.exactOutputSizeIfKnown(spliterator);
265                 if (size != -1)
266                     return size;
267                 return super.evaluateParallel(helper, spliterator);
268             }
269 
270             @Override
271             public int getOpFlags() {
272                 return StreamOpFlag.NOT_ORDERED;
273             }
274         };
275     }
276 
277     /**
278      * Constructs a {@code TerminalOp} that implements a functional reduce on
279      * {@code int} values.
280      *
281      * @param identity the identity for the combining function
282      * @param operator the combining function
283      * @return a {@code TerminalOp} implementing the reduction
284      */
285     public static TerminalOp<Integer, Integer>
286     makeInt(int identity, IntBinaryOperator operator) {
287         Objects.requireNonNull(operator);
288         class ReducingSink
289                 implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt {
290             private int state;
291 
292             @Override
293             public void begin(long size) {
294                 state = identity;
295             }
296 
297             @Override
298             public void accept(int t) {
299                 state = operator.applyAsInt(state, t);
300             }
301 
302             @Override
303             public Integer get() {
304                 return state;
305             }
306 
307             @Override
308             public void combine(ReducingSink other) {
309                 accept(other.state);
310             }
311         }
312         return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) {
313             @Override
314             public ReducingSink makeSink() {
315                 return new ReducingSink();
316             }
317         };
318     }
319 
320     /**
321      * Constructs a {@code TerminalOp} that implements a functional reduce on
322      * {@code int} values, producing an optional integer result.
323      *
324      * @param operator the combining function
325      * @return a {@code TerminalOp} implementing the reduction
326      */
327     public static TerminalOp<Integer, OptionalInt>
328     makeInt(IntBinaryOperator operator) {
329         Objects.requireNonNull(operator);
330         class ReducingSink
331                 implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt {
332             private boolean empty;
333             private int state;
334 
335             public void begin(long size) {
336                 empty = true;
337                 state = 0;
338             }
339 
340             @Override
341             public void accept(int t) {
342                 if (empty) {
343                     empty = false;
344                     state = t;
345                 }
346                 else {
347                     state = operator.applyAsInt(state, t);
348                 }
349             }
350 
351             @Override
352             public OptionalInt get() {
353                 return empty ? OptionalInt.empty() : OptionalInt.of(state);
354             }
355 
356             @Override
357             public void combine(ReducingSink other) {
358                 if (!other.empty)
359                     accept(other.state);
360             }
361         }
362         return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) {
363             @Override
364             public ReducingSink makeSink() {
365                 return new ReducingSink();
366             }
367         };
368     }
369 
370     /**
371      * Constructs a {@code TerminalOp} that implements a mutable reduce on
372      * {@code int} values.
373      *
374      * @param <R> The type of the result
375      * @param supplier a factory to produce a new accumulator of the result type
376      * @param accumulator a function to incorporate an int into an
377      *        accumulator
378      * @param combiner a function to combine an accumulator into another
379      * @return A {@code ReduceOp} implementing the reduction
380      */
381     public static <R> TerminalOp<Integer, R>
382     makeInt(Supplier<R> supplier,
383             ObjIntConsumer<R> accumulator,
384             BinaryOperator<R> combiner) {
385         Objects.requireNonNull(supplier);
386         Objects.requireNonNull(accumulator);
387         Objects.requireNonNull(combiner);
388         class ReducingSink extends Box<R>
389                 implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt {
390             @Override
391             public void begin(long size) {
392                 state = supplier.get();
393             }
394 
395             @Override
396             public void accept(int t) {
397                 accumulator.accept(state, t);
398             }
399 
400             @Override
401             public void combine(ReducingSink other) {
402                 state = combiner.apply(state, other.state);
403             }
404         }
405         return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) {
406             @Override
407             public ReducingSink makeSink() {
408                 return new ReducingSink();
409             }
410         };
411     }
412 
413     /**
414      * Constructs a {@code TerminalOp} that counts the number of stream
415      * elements.  If the size of the pipeline is known then count is the size
416      * and there is no need to evaluate the pipeline.  If the size of the
417      * pipeline is non known then count is produced, via reduction, using a
418      * {@link CountingSink}.
419      *
420      * @return a {@code TerminalOp} implementing the counting
421      */
422     public static TerminalOp<Integer, Long>
423     makeIntCounting() {
424         return new ReduceOp<Integer, Long, CountingSink<Integer>>(StreamShape.INT_VALUE) {
425             @Override
426             public CountingSink<Integer> makeSink() { return new CountingSink.OfInt(); }
427 
428             @Override
429             public <P_IN> Long evaluateSequential(PipelineHelper<Integer> helper,
430                                                   Spliterator<P_IN> spliterator) {
431                 long size = helper.exactOutputSizeIfKnown(spliterator);
432                 if (size != -1)
433                     return size;
434                 return super.evaluateSequential(helper, spliterator);
435             }
436 
437             @Override
438             public <P_IN> Long evaluateParallel(PipelineHelper<Integer> helper,
439                                                 Spliterator<P_IN> spliterator) {
440                 long size = helper.exactOutputSizeIfKnown(spliterator);
441                 if (size != -1)
442                     return size;
443                 return super.evaluateParallel(helper, spliterator);
444             }
445 
446             @Override
447             public int getOpFlags() {
448                 return StreamOpFlag.NOT_ORDERED;
449             }
450         };
451     }
452 
453     /**
454      * Constructs a {@code TerminalOp} that implements a functional reduce on
455      * {@code long} values.
456      *
457      * @param identity the identity for the combining function
458      * @param operator the combining function
459      * @return a {@code TerminalOp} implementing the reduction
460      */
461     public static TerminalOp<Long, Long>
462     makeLong(long identity, LongBinaryOperator operator) {
463         Objects.requireNonNull(operator);
464         class ReducingSink
465                 implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong {
466             private long state;
467 
468             @Override
469             public void begin(long size) {
470                 state = identity;
471             }
472 
473             @Override
474             public void accept(long t) {
475                 state = operator.applyAsLong(state, t);
476             }
477 
478             @Override
479             public Long get() {
480                 return state;
481             }
482 
483             @Override
484             public void combine(ReducingSink other) {
485                 accept(other.state);
486             }
487         }
488         return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) {
489             @Override
490             public ReducingSink makeSink() {
491                 return new ReducingSink();
492             }
493         };
494     }
495 
496     /**
497      * Constructs a {@code TerminalOp} that implements a functional reduce on
498      * {@code long} values, producing an optional long result.
499      *
500      * @param operator the combining function
501      * @return a {@code TerminalOp} implementing the reduction
502      */
503     public static TerminalOp<Long, OptionalLong>
504     makeLong(LongBinaryOperator operator) {
505         Objects.requireNonNull(operator);
506         class ReducingSink
507                 implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong {
508             private boolean empty;
509             private long state;
510 
511             public void begin(long size) {
512                 empty = true;
513                 state = 0;
514             }
515 
516             @Override
517             public void accept(long t) {
518                 if (empty) {
519                     empty = false;
520                     state = t;
521                 }
522                 else {
523                     state = operator.applyAsLong(state, t);
524                 }
525             }
526 
527             @Override
528             public OptionalLong get() {
529                 return empty ? OptionalLong.empty() : OptionalLong.of(state);
530             }
531 
532             @Override
533             public void combine(ReducingSink other) {
534                 if (!other.empty)
535                     accept(other.state);
536             }
537         }
538         return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) {
539             @Override
540             public ReducingSink makeSink() {
541                 return new ReducingSink();
542             }
543         };
544     }
545 
546     /**
547      * Constructs a {@code TerminalOp} that implements a mutable reduce on
548      * {@code long} values.
549      *
550      * @param <R> the type of the result
551      * @param supplier a factory to produce a new accumulator of the result type
552      * @param accumulator a function to incorporate an int into an
553      *        accumulator
554      * @param combiner a function to combine an accumulator into another
555      * @return a {@code TerminalOp} implementing the reduction
556      */
557     public static <R> TerminalOp<Long, R>
558     makeLong(Supplier<R> supplier,
559              ObjLongConsumer<R> accumulator,
560              BinaryOperator<R> combiner) {
561         Objects.requireNonNull(supplier);
562         Objects.requireNonNull(accumulator);
563         Objects.requireNonNull(combiner);
564         class ReducingSink extends Box<R>
565                 implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong {
566             @Override
567             public void begin(long size) {
568                 state = supplier.get();
569             }
570 
571             @Override
572             public void accept(long t) {
573                 accumulator.accept(state, t);
574             }
575 
576             @Override
577             public void combine(ReducingSink other) {
578                 state = combiner.apply(state, other.state);
579             }
580         }
581         return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) {
582             @Override
583             public ReducingSink makeSink() {
584                 return new ReducingSink();
585             }
586         };
587     }
588 
589     /**
590      * Constructs a {@code TerminalOp} that counts the number of stream
591      * elements.  If the size of the pipeline is known then count is the size
592      * and there is no need to evaluate the pipeline.  If the size of the
593      * pipeline is non known then count is produced, via reduction, using a
594      * {@link CountingSink}.
595      *
596      * @return a {@code TerminalOp} implementing the counting
597      */
598     public static TerminalOp<Long, Long>
599     makeLongCounting() {
600         return new ReduceOp<Long, Long, CountingSink<Long>>(StreamShape.LONG_VALUE) {
601             @Override
602             public CountingSink<Long> makeSink() { return new CountingSink.OfLong(); }
603 
604             @Override
605             public <P_IN> Long evaluateSequential(PipelineHelper<Long> helper,
606                                                   Spliterator<P_IN> spliterator) {
607                 long size = helper.exactOutputSizeIfKnown(spliterator);
608                 if (size != -1)
609                     return size;
610                 return super.evaluateSequential(helper, spliterator);
611             }
612 
613             @Override
614             public <P_IN> Long evaluateParallel(PipelineHelper<Long> helper,
615                                                 Spliterator<P_IN> spliterator) {
616                 long size = helper.exactOutputSizeIfKnown(spliterator);
617                 if (size != -1)
618                     return size;
619                 return super.evaluateParallel(helper, spliterator);
620             }
621 
622             @Override
623             public int getOpFlags() {
624                 return StreamOpFlag.NOT_ORDERED;
625             }
626         };
627     }
628 
629     /**
630      * Constructs a {@code TerminalOp} that implements a functional reduce on
631      * {@code double} values.
632      *
633      * @param identity the identity for the combining function
634      * @param operator the combining function
635      * @return a {@code TerminalOp} implementing the reduction
636      */
637     public static TerminalOp<Double, Double>
638     makeDouble(double identity, DoubleBinaryOperator operator) {
639         Objects.requireNonNull(operator);
640         class ReducingSink
641                 implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble {
642             private double state;
643 
644             @Override
645             public void begin(long size) {
646                 state = identity;
647             }
648 
649             @Override
650             public void accept(double t) {
651                 state = operator.applyAsDouble(state, t);
652             }
653 
654             @Override
655             public Double get() {
656                 return state;
657             }
658 
659             @Override
660             public void combine(ReducingSink other) {
661                 accept(other.state);
662             }
663         }
664         return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) {
665             @Override
666             public ReducingSink makeSink() {
667                 return new ReducingSink();
668             }
669         };
670     }
671 
672     /**
673      * Constructs a {@code TerminalOp} that implements a functional reduce on
674      * {@code double} values, producing an optional double result.
675      *
676      * @param operator the combining function
677      * @return a {@code TerminalOp} implementing the reduction
678      */
679     public static TerminalOp<Double, OptionalDouble>
680     makeDouble(DoubleBinaryOperator operator) {
681         Objects.requireNonNull(operator);
682         class ReducingSink
683                 implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble {
684             private boolean empty;
685             private double state;
686 
687             public void begin(long size) {
688                 empty = true;
689                 state = 0;
690             }
691 
692             @Override
693             public void accept(double t) {
694                 if (empty) {
695                     empty = false;
696                     state = t;
697                 }
698                 else {
699                     state = operator.applyAsDouble(state, t);
700                 }
701             }
702 
703             @Override
704             public OptionalDouble get() {
705                 return empty ? OptionalDouble.empty() : OptionalDouble.of(state);
706             }
707 
708             @Override
709             public void combine(ReducingSink other) {
710                 if (!other.empty)
711                     accept(other.state);
712             }
713         }
714         return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) {
715             @Override
716             public ReducingSink makeSink() {
717                 return new ReducingSink();
718             }
719         };
720     }
721 
722     /**
723      * Constructs a {@code TerminalOp} that implements a mutable reduce on
724      * {@code double} values.
725      *
726      * @param <R> the type of the result
727      * @param supplier a factory to produce a new accumulator of the result type
728      * @param accumulator a function to incorporate an int into an
729      *        accumulator
730      * @param combiner a function to combine an accumulator into another
731      * @return a {@code TerminalOp} implementing the reduction
732      */
733     public static <R> TerminalOp<Double, R>
734     makeDouble(Supplier<R> supplier,
735                ObjDoubleConsumer<R> accumulator,
736                BinaryOperator<R> combiner) {
737         Objects.requireNonNull(supplier);
738         Objects.requireNonNull(accumulator);
739         Objects.requireNonNull(combiner);
740         class ReducingSink extends Box<R>
741                 implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble {
742             @Override
743             public void begin(long size) {
744                 state = supplier.get();
745             }
746 
747             @Override
748             public void accept(double t) {
749                 accumulator.accept(state, t);
750             }
751 
752             @Override
753             public void combine(ReducingSink other) {
754                 state = combiner.apply(state, other.state);
755             }
756         }
757         return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) {
758             @Override
759             public ReducingSink makeSink() {
760                 return new ReducingSink();
761             }
762         };
763     }
764 
765     /**
766      * Constructs a {@code TerminalOp} that counts the number of stream
767      * elements.  If the size of the pipeline is known then count is the size
768      * and there is no need to evaluate the pipeline.  If the size of the
769      * pipeline is non known then count is produced, via reduction, using a
770      * {@link CountingSink}.
771      *
772      * @return a {@code TerminalOp} implementing the counting
773      */
774     public static TerminalOp<Double, Long>
775     makeDoubleCounting() {
776         return new ReduceOp<Double, Long, CountingSink<Double>>(StreamShape.DOUBLE_VALUE) {
777             @Override
778             public CountingSink<Double> makeSink() { return new CountingSink.OfDouble(); }
779 
780             @Override
781             public <P_IN> Long evaluateSequential(PipelineHelper<Double> helper,
782                                                   Spliterator<P_IN> spliterator) {
783                 long size = helper.exactOutputSizeIfKnown(spliterator);
784                 if (size != -1)
785                     return size;
786                 return super.evaluateSequential(helper, spliterator);
787             }
788 
789             @Override
790             public <P_IN> Long evaluateParallel(PipelineHelper<Double> helper,
791                                                 Spliterator<P_IN> spliterator) {
792                 long size = helper.exactOutputSizeIfKnown(spliterator);
793                 if (size != -1)
794                     return size;
795                 return super.evaluateParallel(helper, spliterator);
796             }
797 
798             @Override
799             public int getOpFlags() {
800                 return StreamOpFlag.NOT_ORDERED;
801             }
802         };
803     }
804 
805     /**
806      * A sink that counts elements
807      */
808     abstract static class CountingSink<T>
809             extends Box<Long>
810             implements AccumulatingSink<T, Long, CountingSink<T>> {
811         long count;
812 
813         @Override
814         public void begin(long size) {
815             count = 0L;
816         }
817 
818         @Override
819         public Long get() {
820             return count;
821         }
822 
823         @Override
824         public void combine(CountingSink<T> other) {
825             count += other.count;
826         }
827 
828         static final class OfRef<T> extends CountingSink<T> {
829             @Override
830             public void accept(T t) {
831                 count++;
832             }
833         }
834 
835         static final class OfInt extends CountingSink<Integer> implements Sink.OfInt {
836             @Override
837             public void accept(int t) {
838                 count++;
839             }
840         }
841 
842         static final class OfLong extends CountingSink<Long> implements Sink.OfLong {
843             @Override
844             public void accept(long t) {
845                 count++;
846             }
847         }
848 
849         static final class OfDouble extends CountingSink<Double> implements Sink.OfDouble {
850             @Override
851             public void accept(double t) {
852                 count++;
853             }
854         }
855     }
856 
857     /**
858      * A type of {@code TerminalSink} that implements an associative reducing
859      * operation on elements of type {@code T} and producing a result of type
860      * {@code R}.
861      *
862      * @param <T> the type of input element to the combining operation
863      * @param <R> the result type
864      * @param <K> the type of the {@code AccumulatingSink}.
865      */
866     private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>>
867             extends TerminalSink<T, R> {
868         void combine(K other);
869     }
870 
871     /**
872      * State box for a single state element, used as a base class for
873      * {@code AccumulatingSink} instances
874      *
875      * @param <U> The type of the state element
876      */
877     private abstract static class Box<U> {
878         U state;
879 
880         Box() {} // Avoid creation of special accessor
881 
882         public U get() {
883             return state;
884         }
885     }
886 
887     /**
888      * A {@code TerminalOp} that evaluates a stream pipeline and sends the
889      * output into an {@code AccumulatingSink}, which performs a reduce
890      * operation. The {@code AccumulatingSink} must represent an associative
891      * reducing operation.
892      *
893      * @param <T> the output type of the stream pipeline
894      * @param <R> the result type of the reducing operation
895      * @param <S> the type of the {@code AccumulatingSink}
896      */
897     private abstract static class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
898             implements TerminalOp<T, R> {
899         private final StreamShape inputShape;
900 
901         /**
902          * Create a {@code ReduceOp} of the specified stream shape which uses
903          * the specified {@code Supplier} to create accumulating sinks.
904          *
905          * @param shape The shape of the stream pipeline
906          */
907         ReduceOp(StreamShape shape) {
908             inputShape = shape;
909         }
910 
911         public abstract S makeSink();
912 
913         @Override
914         public StreamShape inputShape() {
915             return inputShape;
916         }
917 
918         @Override
919         public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
920                                            Spliterator<P_IN> spliterator) {
921             return helper.wrapAndCopyInto(makeSink(), spliterator).get();
922         }
923 
924         @Override
925         public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
926                                          Spliterator<P_IN> spliterator) {
927             return new ReduceTask<>(this, helper, spliterator).invoke().get();
928         }
929     }
930 
931     /**
932      * A {@code ForkJoinTask} for performing a parallel reduce operation.
933      */
934     @SuppressWarnings("serial")
935     private static final class ReduceTask<P_IN, P_OUT, R,
936                                           S extends AccumulatingSink<P_OUT, R, S>>
937             extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
938         private final ReduceOp<P_OUT, R, S> op;
939 
940         ReduceTask(ReduceOp<P_OUT, R, S> op,
941                    PipelineHelper<P_OUT> helper,
942                    Spliterator<P_IN> spliterator) {
943             super(helper, spliterator);
944             this.op = op;
945         }
946 
947         ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,
948                    Spliterator<P_IN> spliterator) {
949             super(parent, spliterator);
950             this.op = parent.op;
951         }
952 
953         @Override
954         protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
955             return new ReduceTask<>(this, spliterator);
956         }
957 
958         @Override
959         protected S doLeaf() {
960             return helper.wrapAndCopyInto(op.makeSink(), spliterator);
961         }
962 
963         @Override
964         public void onCompletion(CountedCompleter<?> caller) {
965             if (!isLeaf()) {
966                 S leftResult = leftChild.getLocalResult();
967                 leftResult.combine(rightChild.getLocalResult());
968                 setLocalResult(leftResult);
969             }
970             // GC spliterator, left and right child
971             super.onCompletion(caller);
972         }
973     }
974 }
975