1 /*
2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.Objects;
28 import java.util.Optional;
29 import java.util.OptionalDouble;
30 import java.util.OptionalInt;
31 import java.util.OptionalLong;
32 import java.util.Spliterator;
33 import java.util.concurrent.CountedCompleter;
34 import java.util.function.BiConsumer;
35 import java.util.function.BiFunction;
36 import java.util.function.BinaryOperator;
37 import java.util.function.DoubleBinaryOperator;
38 import java.util.function.IntBinaryOperator;
39 import java.util.function.LongBinaryOperator;
40 import java.util.function.ObjDoubleConsumer;
41 import java.util.function.ObjIntConsumer;
42 import java.util.function.ObjLongConsumer;
43 import java.util.function.Supplier;
44 
45 /**
46  * Factory for creating instances of {@code TerminalOp} that implement
47  * reductions.
48  *
49  * @since 1.8
50  */
51 final class ReduceOps {
52 
ReduceOps()53     private ReduceOps() { }
54 
55     /**
56      * Constructs a {@code TerminalOp} that implements a functional reduce on
57      * reference values.
58      *
59      * @param <T> the type of the input elements
60      * @param <U> the type of the result
61      * @param seed the identity element for the reduction
62      * @param reducer the accumulating function that incorporates an additional
63      *        input element into the result
64      * @param combiner the combining function that combines two intermediate
65      *        results
66      * @return a {@code TerminalOp} implementing the reduction
67      */
68     public static <T, U> TerminalOp<T, U>
makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner)69     makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
70         Objects.requireNonNull(reducer);
71         Objects.requireNonNull(combiner);
72         class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
73             @Override
74             public void begin(long size) {
75                 state = seed;
76             }
77 
78             @Override
79             public void accept(T t) {
80                 state = reducer.apply(state, t);
81             }
82 
83             @Override
84             public void combine(ReducingSink other) {
85                 state = combiner.apply(state, other.state);
86             }
87         }
88         return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
89             @Override
90             public ReducingSink makeSink() {
91                 return new ReducingSink();
92             }
93         };
94     }
95 
96     /**
97      * Constructs a {@code TerminalOp} that implements a functional reduce on
98      * reference values producing an optional reference result.
99      *
100      * @param <T> The type of the input elements, and the type of the result
101      * @param operator The reducing function
102      * @return A {@code TerminalOp} implementing the reduction
103      */
104     public static <T> TerminalOp<T, Optional<T>>
105     makeRef(BinaryOperator<T> operator) {
106         Objects.requireNonNull(operator);
107         class ReducingSink
108                 implements AccumulatingSink<T, Optional<T>, ReducingSink> {
109             private boolean empty;
110             private T state;
111 
112             public void begin(long size) {
113                 empty = true;
114                 state = null;
115             }
116 
117             @Override
118             public void accept(T t) {
119                 if (empty) {
120                     empty = false;
121                     state = t;
122                 } else {
123                     state = operator.apply(state, t);
124                 }
125             }
126 
127             @Override
128             public Optional<T> get() {
129                 return empty ? Optional.empty() : Optional.of(state);
130             }
131 
132             @Override
133             public void combine(ReducingSink other) {
134                 if (!other.empty)
135                     accept(other.state);
136             }
137         }
138         return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) {
139             @Override
140             public ReducingSink makeSink() {
141                 return new ReducingSink();
142             }
143         };
144     }
145 
146     /**
147      * Constructs a {@code TerminalOp} that implements a mutable reduce on
148      * reference values.
149      *
150      * @param <T> the type of the input elements
151      * @param <I> the type of the intermediate reduction result
152      * @param collector a {@code Collector} defining the reduction
153      * @return a {@code ReduceOp} implementing the reduction
154      */
155     public static <T, I> TerminalOp<T, I>
156     makeRef(Collector<? super T, I, ?> collector) {
157         Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
158         BiConsumer<I, ? super T> accumulator = collector.accumulator();
159         BinaryOperator<I> combiner = collector.combiner();
160         class ReducingSink extends Box<I>
161                 implements AccumulatingSink<T, I, ReducingSink> {
162             @Override
163             public void begin(long size) {
164                 state = supplier.get();
165             }
166 
167             @Override
168             public void accept(T t) {
169                 accumulator.accept(state, t);
170             }
171 
172             @Override
173             public void combine(ReducingSink other) {
174                 state = combiner.apply(state, other.state);
175             }
176         }
177         return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
178             @Override
179             public ReducingSink makeSink() {
180                 return new ReducingSink();
181             }
182 
183             @Override
184             public int getOpFlags() {
185                 return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
186                        ? StreamOpFlag.NOT_ORDERED
187                        : 0;
188             }
189         };
190     }
191 
192     /**
193      * Constructs a {@code TerminalOp} that implements a mutable reduce on
194      * reference values.
195      *
196      * @param <T> the type of the input elements
197      * @param <R> the type of the result
198      * @param seedFactory a factory to produce a new base accumulator
199      * @param accumulator a function to incorporate an element into an
200      *        accumulator
201      * @param reducer a function to combine an accumulator into another
202      * @return a {@code TerminalOp} implementing the reduction
203      */
204     public static <T, R> TerminalOp<T, R>
205     makeRef(Supplier<R> seedFactory,
206             BiConsumer<R, ? super T> accumulator,
207             BiConsumer<R,R> reducer) {
208         Objects.requireNonNull(seedFactory);
209         Objects.requireNonNull(accumulator);
210         Objects.requireNonNull(reducer);
211         class ReducingSink extends Box<R>
212                 implements AccumulatingSink<T, R, ReducingSink> {
213             @Override
214             public void begin(long size) {
215                 state = seedFactory.get();
216             }
217 
218             @Override
219             public void accept(T t) {
220                 accumulator.accept(state, t);
221             }
222 
223             @Override
224             public void combine(ReducingSink other) {
225                 reducer.accept(state, other.state);
226             }
227         }
228         return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) {
229             @Override
230             public ReducingSink makeSink() {
231                 return new ReducingSink();
232             }
233         };
234     }
235 
236     /**
237      * Constructs a {@code TerminalOp} that implements a functional reduce on
238      * {@code int} values.
239      *
240      * @param identity the identity for the combining function
241      * @param operator the combining function
242      * @return a {@code TerminalOp} implementing the reduction
243      */
244     public static TerminalOp<Integer, Integer>
245     makeInt(int identity, IntBinaryOperator operator) {
246         Objects.requireNonNull(operator);
247         class ReducingSink
248                 implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt {
249             private int state;
250 
251             @Override
252             public void begin(long size) {
253                 state = identity;
254             }
255 
256             @Override
257             public void accept(int t) {
258                 state = operator.applyAsInt(state, t);
259             }
260 
261             @Override
262             public Integer get() {
263                 return state;
264             }
265 
266             @Override
267             public void combine(ReducingSink other) {
268                 accept(other.state);
269             }
270         }
271         return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) {
272             @Override
273             public ReducingSink makeSink() {
274                 return new ReducingSink();
275             }
276         };
277     }
278 
279     /**
280      * Constructs a {@code TerminalOp} that implements a functional reduce on
281      * {@code int} values, producing an optional integer result.
282      *
283      * @param operator the combining function
284      * @return a {@code TerminalOp} implementing the reduction
285      */
286     public static TerminalOp<Integer, OptionalInt>
287     makeInt(IntBinaryOperator operator) {
288         Objects.requireNonNull(operator);
289         class ReducingSink
290                 implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt {
291             private boolean empty;
292             private int state;
293 
294             public void begin(long size) {
295                 empty = true;
296                 state = 0;
297             }
298 
299             @Override
300             public void accept(int t) {
301                 if (empty) {
302                     empty = false;
303                     state = t;
304                 }
305                 else {
306                     state = operator.applyAsInt(state, t);
307                 }
308             }
309 
310             @Override
311             public OptionalInt get() {
312                 return empty ? OptionalInt.empty() : OptionalInt.of(state);
313             }
314 
315             @Override
316             public void combine(ReducingSink other) {
317                 if (!other.empty)
318                     accept(other.state);
319             }
320         }
321         return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) {
322             @Override
323             public ReducingSink makeSink() {
324                 return new ReducingSink();
325             }
326         };
327     }
328 
329     /**
330      * Constructs a {@code TerminalOp} that implements a mutable reduce on
331      * {@code int} values.
332      *
333      * @param <R> The type of the result
334      * @param supplier a factory to produce a new accumulator of the result type
335      * @param accumulator a function to incorporate an int into an
336      *        accumulator
337      * @param combiner a function to combine an accumulator into another
338      * @return A {@code ReduceOp} implementing the reduction
339      */
340     public static <R> TerminalOp<Integer, R>
341     makeInt(Supplier<R> supplier,
342             ObjIntConsumer<R> accumulator,
343             BinaryOperator<R> combiner) {
344         Objects.requireNonNull(supplier);
345         Objects.requireNonNull(accumulator);
346         Objects.requireNonNull(combiner);
347         class ReducingSink extends Box<R>
348                 implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt {
349             @Override
350             public void begin(long size) {
351                 state = supplier.get();
352             }
353 
354             @Override
355             public void accept(int t) {
356                 accumulator.accept(state, t);
357             }
358 
359             @Override
360             public void combine(ReducingSink other) {
361                 state = combiner.apply(state, other.state);
362             }
363         }
364         return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) {
365             @Override
366             public ReducingSink makeSink() {
367                 return new ReducingSink();
368             }
369         };
370     }
371 
372     /**
373      * Constructs a {@code TerminalOp} that implements a functional reduce on
374      * {@code long} values.
375      *
376      * @param identity the identity for the combining function
377      * @param operator the combining function
378      * @return a {@code TerminalOp} implementing the reduction
379      */
380     public static TerminalOp<Long, Long>
381     makeLong(long identity, LongBinaryOperator operator) {
382         Objects.requireNonNull(operator);
383         class ReducingSink
384                 implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong {
385             private long state;
386 
387             @Override
388             public void begin(long size) {
389                 state = identity;
390             }
391 
392             @Override
393             public void accept(long t) {
394                 state = operator.applyAsLong(state, t);
395             }
396 
397             @Override
398             public Long get() {
399                 return state;
400             }
401 
402             @Override
403             public void combine(ReducingSink other) {
404                 accept(other.state);
405             }
406         }
407         return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) {
408             @Override
409             public ReducingSink makeSink() {
410                 return new ReducingSink();
411             }
412         };
413     }
414 
415     /**
416      * Constructs a {@code TerminalOp} that implements a functional reduce on
417      * {@code long} values, producing an optional long result.
418      *
419      * @param operator the combining function
420      * @return a {@code TerminalOp} implementing the reduction
421      */
422     public static TerminalOp<Long, OptionalLong>
423     makeLong(LongBinaryOperator operator) {
424         Objects.requireNonNull(operator);
425         class ReducingSink
426                 implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong {
427             private boolean empty;
428             private long state;
429 
430             public void begin(long size) {
431                 empty = true;
432                 state = 0;
433             }
434 
435             @Override
436             public void accept(long t) {
437                 if (empty) {
438                     empty = false;
439                     state = t;
440                 }
441                 else {
442                     state = operator.applyAsLong(state, t);
443                 }
444             }
445 
446             @Override
447             public OptionalLong get() {
448                 return empty ? OptionalLong.empty() : OptionalLong.of(state);
449             }
450 
451             @Override
452             public void combine(ReducingSink other) {
453                 if (!other.empty)
454                     accept(other.state);
455             }
456         }
457         return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) {
458             @Override
459             public ReducingSink makeSink() {
460                 return new ReducingSink();
461             }
462         };
463     }
464 
465     /**
466      * Constructs a {@code TerminalOp} that implements a mutable reduce on
467      * {@code long} values.
468      *
469      * @param <R> the type of the result
470      * @param supplier a factory to produce a new accumulator of the result type
471      * @param accumulator a function to incorporate an int into an
472      *        accumulator
473      * @param combiner a function to combine an accumulator into another
474      * @return a {@code TerminalOp} implementing the reduction
475      */
476     public static <R> TerminalOp<Long, R>
477     makeLong(Supplier<R> supplier,
478              ObjLongConsumer<R> accumulator,
479              BinaryOperator<R> combiner) {
480         Objects.requireNonNull(supplier);
481         Objects.requireNonNull(accumulator);
482         Objects.requireNonNull(combiner);
483         class ReducingSink extends Box<R>
484                 implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong {
485             @Override
486             public void begin(long size) {
487                 state = supplier.get();
488             }
489 
490             @Override
491             public void accept(long t) {
492                 accumulator.accept(state, t);
493             }
494 
495             @Override
496             public void combine(ReducingSink other) {
497                 state = combiner.apply(state, other.state);
498             }
499         }
500         return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) {
501             @Override
502             public ReducingSink makeSink() {
503                 return new ReducingSink();
504             }
505         };
506     }
507 
508     /**
509      * Constructs a {@code TerminalOp} that implements a functional reduce on
510      * {@code double} values.
511      *
512      * @param identity the identity for the combining function
513      * @param operator the combining function
514      * @return a {@code TerminalOp} implementing the reduction
515      */
516     public static TerminalOp<Double, Double>
517     makeDouble(double identity, DoubleBinaryOperator operator) {
518         Objects.requireNonNull(operator);
519         class ReducingSink
520                 implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble {
521             private double state;
522 
523             @Override
524             public void begin(long size) {
525                 state = identity;
526             }
527 
528             @Override
529             public void accept(double t) {
530                 state = operator.applyAsDouble(state, t);
531             }
532 
533             @Override
534             public Double get() {
535                 return state;
536             }
537 
538             @Override
539             public void combine(ReducingSink other) {
540                 accept(other.state);
541             }
542         }
543         return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) {
544             @Override
545             public ReducingSink makeSink() {
546                 return new ReducingSink();
547             }
548         };
549     }
550 
551     /**
552      * Constructs a {@code TerminalOp} that implements a functional reduce on
553      * {@code double} values, producing an optional double result.
554      *
555      * @param operator the combining function
556      * @return a {@code TerminalOp} implementing the reduction
557      */
558     public static TerminalOp<Double, OptionalDouble>
559     makeDouble(DoubleBinaryOperator operator) {
560         Objects.requireNonNull(operator);
561         class ReducingSink
562                 implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble {
563             private boolean empty;
564             private double state;
565 
566             public void begin(long size) {
567                 empty = true;
568                 state = 0;
569             }
570 
571             @Override
572             public void accept(double t) {
573                 if (empty) {
574                     empty = false;
575                     state = t;
576                 }
577                 else {
578                     state = operator.applyAsDouble(state, t);
579                 }
580             }
581 
582             @Override
583             public OptionalDouble get() {
584                 return empty ? OptionalDouble.empty() : OptionalDouble.of(state);
585             }
586 
587             @Override
588             public void combine(ReducingSink other) {
589                 if (!other.empty)
590                     accept(other.state);
591             }
592         }
593         return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) {
594             @Override
595             public ReducingSink makeSink() {
596                 return new ReducingSink();
597             }
598         };
599     }
600 
601     /**
602      * Constructs a {@code TerminalOp} that implements a mutable reduce on
603      * {@code double} values.
604      *
605      * @param <R> the type of the result
606      * @param supplier a factory to produce a new accumulator of the result type
607      * @param accumulator a function to incorporate an int into an
608      *        accumulator
609      * @param combiner a function to combine an accumulator into another
610      * @return a {@code TerminalOp} implementing the reduction
611      */
612     public static <R> TerminalOp<Double, R>
613     makeDouble(Supplier<R> supplier,
614                ObjDoubleConsumer<R> accumulator,
615                BinaryOperator<R> combiner) {
616         Objects.requireNonNull(supplier);
617         Objects.requireNonNull(accumulator);
618         Objects.requireNonNull(combiner);
619         class ReducingSink extends Box<R>
620                 implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble {
621             @Override
622             public void begin(long size) {
623                 state = supplier.get();
624             }
625 
626             @Override
627             public void accept(double t) {
628                 accumulator.accept(state, t);
629             }
630 
631             @Override
632             public void combine(ReducingSink other) {
633                 state = combiner.apply(state, other.state);
634             }
635         }
636         return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) {
637             @Override
638             public ReducingSink makeSink() {
639                 return new ReducingSink();
640             }
641         };
642     }
643 
644     /**
645      * A type of {@code TerminalSink} that implements an associative reducing
646      * operation on elements of type {@code T} and producing a result of type
647      * {@code R}.
648      *
649      * @param <T> the type of input element to the combining operation
650      * @param <R> the result type
651      * @param <K> the type of the {@code AccumulatingSink}.
652      */
653     private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>>
654             extends TerminalSink<T, R> {
655         public void combine(K other);
656     }
657 
658     /**
659      * State box for a single state element, used as a base class for
660      * {@code AccumulatingSink} instances
661      *
662      * @param <U> The type of the state element
663      */
664     private static abstract class Box<U> {
665         U state;
666 
667         Box() {} // Avoid creation of special accessor
668 
669         public U get() {
670             return state;
671         }
672     }
673 
674     /**
675      * A {@code TerminalOp} that evaluates a stream pipeline and sends the
676      * output into an {@code AccumulatingSink}, which performs a reduce
677      * operation. The {@code AccumulatingSink} must represent an associative
678      * reducing operation.
679      *
680      * @param <T> the output type of the stream pipeline
681      * @param <R> the result type of the reducing operation
682      * @param <S> the type of the {@code AccumulatingSink}
683      */
684     private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
685             implements TerminalOp<T, R> {
686         private final StreamShape inputShape;
687 
688         /**
689          * Create a {@code ReduceOp} of the specified stream shape which uses
690          * the specified {@code Supplier} to create accumulating sinks.
691          *
692          * @param shape The shape of the stream pipeline
693          */
694         ReduceOp(StreamShape shape) {
695             inputShape = shape;
696         }
697 
698         public abstract S makeSink();
699 
700         @Override
701         public StreamShape inputShape() {
702             return inputShape;
703         }
704 
705         @Override
706         public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
707                                            Spliterator<P_IN> spliterator) {
708             return helper.wrapAndCopyInto(makeSink(), spliterator).get();
709         }
710 
711         @Override
712         public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
713                                          Spliterator<P_IN> spliterator) {
714             return new ReduceTask<>(this, helper, spliterator).invoke().get();
715         }
716     }
717 
718     /**
719      * A {@code ForkJoinTask} for performing a parallel reduce operation.
720      */
721     @SuppressWarnings("serial")
722     private static final class ReduceTask<P_IN, P_OUT, R,
723                                           S extends AccumulatingSink<P_OUT, R, S>>
724             extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
725         private final ReduceOp<P_OUT, R, S> op;
726 
727         ReduceTask(ReduceOp<P_OUT, R, S> op,
728                    PipelineHelper<P_OUT> helper,
729                    Spliterator<P_IN> spliterator) {
730             super(helper, spliterator);
731             this.op = op;
732         }
733 
734         ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,
735                    Spliterator<P_IN> spliterator) {
736             super(parent, spliterator);
737             this.op = parent.op;
738         }
739 
740         @Override
741         protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
742             return new ReduceTask<>(this, spliterator);
743         }
744 
745         @Override
746         protected S doLeaf() {
747             return helper.wrapAndCopyInto(op.makeSink(), spliterator);
748         }
749 
750         @Override
751         public void onCompletion(CountedCompleter<?> caller) {
752             if (!isLeaf()) {
753                 S leftResult = leftChild.getLocalResult();
754                 leftResult.combine(rightChild.getLocalResult());
755                 setLocalResult(leftResult);
756             }
757             // GC spliterator, left and right child
758             super.onCompletion(caller);
759         }
760     }
761 }
762