1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Objects; 28 import java.util.Optional; 29 import java.util.OptionalDouble; 30 import java.util.OptionalInt; 31 import java.util.OptionalLong; 32 import java.util.Spliterator; 33 import java.util.concurrent.CountedCompleter; 34 import java.util.function.BiConsumer; 35 import java.util.function.BiFunction; 36 import java.util.function.BinaryOperator; 37 import java.util.function.DoubleBinaryOperator; 38 import java.util.function.IntBinaryOperator; 39 import java.util.function.LongBinaryOperator; 40 import java.util.function.ObjDoubleConsumer; 41 import java.util.function.ObjIntConsumer; 42 import java.util.function.ObjLongConsumer; 43 import java.util.function.Supplier; 44 45 /** 46 * Factory for creating instances of {@code TerminalOp} that implement 47 * reductions. 48 * 49 * @since 1.8 50 */ 51 final class ReduceOps { 52 ReduceOps()53 private ReduceOps() { } 54 55 /** 56 * Constructs a {@code TerminalOp} that implements a functional reduce on 57 * reference values. 58 * 59 * @param <T> the type of the input elements 60 * @param <U> the type of the result 61 * @param seed the identity element for the reduction 62 * @param reducer the accumulating function that incorporates an additional 63 * input element into the result 64 * @param combiner the combining function that combines two intermediate 65 * results 66 * @return a {@code TerminalOp} implementing the reduction 67 */ 68 public static <T, U> TerminalOp<T, U> makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner)69 makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) { 70 Objects.requireNonNull(reducer); 71 Objects.requireNonNull(combiner); 72 class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> { 73 @Override 74 public void begin(long size) { 75 state = seed; 76 } 77 78 @Override 79 public void accept(T t) { 80 state = reducer.apply(state, t); 81 } 82 83 @Override 84 public void combine(ReducingSink other) { 85 state = combiner.apply(state, other.state); 86 } 87 } 88 return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) { 89 @Override 90 public ReducingSink makeSink() { 91 return new ReducingSink(); 92 } 93 }; 94 } 95 96 /** 97 * Constructs a {@code TerminalOp} that implements a functional reduce on 98 * reference values producing an optional reference result. 99 * 100 * @param <T> The type of the input elements, and the type of the result 101 * @param operator The reducing function 102 * @return A {@code TerminalOp} implementing the reduction 103 */ 104 public static <T> TerminalOp<T, Optional<T>> 105 makeRef(BinaryOperator<T> operator) { 106 Objects.requireNonNull(operator); 107 class ReducingSink 108 implements AccumulatingSink<T, Optional<T>, ReducingSink> { 109 private boolean empty; 110 private T state; 111 112 public void begin(long size) { 113 empty = true; 114 state = null; 115 } 116 117 @Override 118 public void accept(T t) { 119 if (empty) { 120 empty = false; 121 state = t; 122 } else { 123 state = operator.apply(state, t); 124 } 125 } 126 127 @Override 128 public Optional<T> get() { 129 return empty ? Optional.empty() : Optional.of(state); 130 } 131 132 @Override 133 public void combine(ReducingSink other) { 134 if (!other.empty) 135 accept(other.state); 136 } 137 } 138 return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) { 139 @Override 140 public ReducingSink makeSink() { 141 return new ReducingSink(); 142 } 143 }; 144 } 145 146 /** 147 * Constructs a {@code TerminalOp} that implements a mutable reduce on 148 * reference values. 149 * 150 * @param <T> the type of the input elements 151 * @param <I> the type of the intermediate reduction result 152 * @param collector a {@code Collector} defining the reduction 153 * @return a {@code ReduceOp} implementing the reduction 154 */ 155 public static <T, I> TerminalOp<T, I> 156 makeRef(Collector<? super T, I, ?> collector) { 157 Supplier<I> supplier = Objects.requireNonNull(collector).supplier(); 158 BiConsumer<I, ? super T> accumulator = collector.accumulator(); 159 BinaryOperator<I> combiner = collector.combiner(); 160 class ReducingSink extends Box<I> 161 implements AccumulatingSink<T, I, ReducingSink> { 162 @Override 163 public void begin(long size) { 164 state = supplier.get(); 165 } 166 167 @Override 168 public void accept(T t) { 169 accumulator.accept(state, t); 170 } 171 172 @Override 173 public void combine(ReducingSink other) { 174 state = combiner.apply(state, other.state); 175 } 176 } 177 return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) { 178 @Override 179 public ReducingSink makeSink() { 180 return new ReducingSink(); 181 } 182 183 @Override 184 public int getOpFlags() { 185 return collector.characteristics().contains(Collector.Characteristics.UNORDERED) 186 ? StreamOpFlag.NOT_ORDERED 187 : 0; 188 } 189 }; 190 } 191 192 /** 193 * Constructs a {@code TerminalOp} that implements a mutable reduce on 194 * reference values. 195 * 196 * @param <T> the type of the input elements 197 * @param <R> the type of the result 198 * @param seedFactory a factory to produce a new base accumulator 199 * @param accumulator a function to incorporate an element into an 200 * accumulator 201 * @param reducer a function to combine an accumulator into another 202 * @return a {@code TerminalOp} implementing the reduction 203 */ 204 public static <T, R> TerminalOp<T, R> 205 makeRef(Supplier<R> seedFactory, 206 BiConsumer<R, ? super T> accumulator, 207 BiConsumer<R,R> reducer) { 208 Objects.requireNonNull(seedFactory); 209 Objects.requireNonNull(accumulator); 210 Objects.requireNonNull(reducer); 211 class ReducingSink extends Box<R> 212 implements AccumulatingSink<T, R, ReducingSink> { 213 @Override 214 public void begin(long size) { 215 state = seedFactory.get(); 216 } 217 218 @Override 219 public void accept(T t) { 220 accumulator.accept(state, t); 221 } 222 223 @Override 224 public void combine(ReducingSink other) { 225 reducer.accept(state, other.state); 226 } 227 } 228 return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) { 229 @Override 230 public ReducingSink makeSink() { 231 return new ReducingSink(); 232 } 233 }; 234 } 235 236 /** 237 * Constructs a {@code TerminalOp} that implements a functional reduce on 238 * {@code int} values. 239 * 240 * @param identity the identity for the combining function 241 * @param operator the combining function 242 * @return a {@code TerminalOp} implementing the reduction 243 */ 244 public static TerminalOp<Integer, Integer> 245 makeInt(int identity, IntBinaryOperator operator) { 246 Objects.requireNonNull(operator); 247 class ReducingSink 248 implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt { 249 private int state; 250 251 @Override 252 public void begin(long size) { 253 state = identity; 254 } 255 256 @Override 257 public void accept(int t) { 258 state = operator.applyAsInt(state, t); 259 } 260 261 @Override 262 public Integer get() { 263 return state; 264 } 265 266 @Override 267 public void combine(ReducingSink other) { 268 accept(other.state); 269 } 270 } 271 return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) { 272 @Override 273 public ReducingSink makeSink() { 274 return new ReducingSink(); 275 } 276 }; 277 } 278 279 /** 280 * Constructs a {@code TerminalOp} that implements a functional reduce on 281 * {@code int} values, producing an optional integer result. 282 * 283 * @param operator the combining function 284 * @return a {@code TerminalOp} implementing the reduction 285 */ 286 public static TerminalOp<Integer, OptionalInt> 287 makeInt(IntBinaryOperator operator) { 288 Objects.requireNonNull(operator); 289 class ReducingSink 290 implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt { 291 private boolean empty; 292 private int state; 293 294 public void begin(long size) { 295 empty = true; 296 state = 0; 297 } 298 299 @Override 300 public void accept(int t) { 301 if (empty) { 302 empty = false; 303 state = t; 304 } 305 else { 306 state = operator.applyAsInt(state, t); 307 } 308 } 309 310 @Override 311 public OptionalInt get() { 312 return empty ? OptionalInt.empty() : OptionalInt.of(state); 313 } 314 315 @Override 316 public void combine(ReducingSink other) { 317 if (!other.empty) 318 accept(other.state); 319 } 320 } 321 return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) { 322 @Override 323 public ReducingSink makeSink() { 324 return new ReducingSink(); 325 } 326 }; 327 } 328 329 /** 330 * Constructs a {@code TerminalOp} that implements a mutable reduce on 331 * {@code int} values. 332 * 333 * @param <R> The type of the result 334 * @param supplier a factory to produce a new accumulator of the result type 335 * @param accumulator a function to incorporate an int into an 336 * accumulator 337 * @param combiner a function to combine an accumulator into another 338 * @return A {@code ReduceOp} implementing the reduction 339 */ 340 public static <R> TerminalOp<Integer, R> 341 makeInt(Supplier<R> supplier, 342 ObjIntConsumer<R> accumulator, 343 BinaryOperator<R> combiner) { 344 Objects.requireNonNull(supplier); 345 Objects.requireNonNull(accumulator); 346 Objects.requireNonNull(combiner); 347 class ReducingSink extends Box<R> 348 implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt { 349 @Override 350 public void begin(long size) { 351 state = supplier.get(); 352 } 353 354 @Override 355 public void accept(int t) { 356 accumulator.accept(state, t); 357 } 358 359 @Override 360 public void combine(ReducingSink other) { 361 state = combiner.apply(state, other.state); 362 } 363 } 364 return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) { 365 @Override 366 public ReducingSink makeSink() { 367 return new ReducingSink(); 368 } 369 }; 370 } 371 372 /** 373 * Constructs a {@code TerminalOp} that implements a functional reduce on 374 * {@code long} values. 375 * 376 * @param identity the identity for the combining function 377 * @param operator the combining function 378 * @return a {@code TerminalOp} implementing the reduction 379 */ 380 public static TerminalOp<Long, Long> 381 makeLong(long identity, LongBinaryOperator operator) { 382 Objects.requireNonNull(operator); 383 class ReducingSink 384 implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong { 385 private long state; 386 387 @Override 388 public void begin(long size) { 389 state = identity; 390 } 391 392 @Override 393 public void accept(long t) { 394 state = operator.applyAsLong(state, t); 395 } 396 397 @Override 398 public Long get() { 399 return state; 400 } 401 402 @Override 403 public void combine(ReducingSink other) { 404 accept(other.state); 405 } 406 } 407 return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) { 408 @Override 409 public ReducingSink makeSink() { 410 return new ReducingSink(); 411 } 412 }; 413 } 414 415 /** 416 * Constructs a {@code TerminalOp} that implements a functional reduce on 417 * {@code long} values, producing an optional long result. 418 * 419 * @param operator the combining function 420 * @return a {@code TerminalOp} implementing the reduction 421 */ 422 public static TerminalOp<Long, OptionalLong> 423 makeLong(LongBinaryOperator operator) { 424 Objects.requireNonNull(operator); 425 class ReducingSink 426 implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong { 427 private boolean empty; 428 private long state; 429 430 public void begin(long size) { 431 empty = true; 432 state = 0; 433 } 434 435 @Override 436 public void accept(long t) { 437 if (empty) { 438 empty = false; 439 state = t; 440 } 441 else { 442 state = operator.applyAsLong(state, t); 443 } 444 } 445 446 @Override 447 public OptionalLong get() { 448 return empty ? OptionalLong.empty() : OptionalLong.of(state); 449 } 450 451 @Override 452 public void combine(ReducingSink other) { 453 if (!other.empty) 454 accept(other.state); 455 } 456 } 457 return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) { 458 @Override 459 public ReducingSink makeSink() { 460 return new ReducingSink(); 461 } 462 }; 463 } 464 465 /** 466 * Constructs a {@code TerminalOp} that implements a mutable reduce on 467 * {@code long} values. 468 * 469 * @param <R> the type of the result 470 * @param supplier a factory to produce a new accumulator of the result type 471 * @param accumulator a function to incorporate an int into an 472 * accumulator 473 * @param combiner a function to combine an accumulator into another 474 * @return a {@code TerminalOp} implementing the reduction 475 */ 476 public static <R> TerminalOp<Long, R> 477 makeLong(Supplier<R> supplier, 478 ObjLongConsumer<R> accumulator, 479 BinaryOperator<R> combiner) { 480 Objects.requireNonNull(supplier); 481 Objects.requireNonNull(accumulator); 482 Objects.requireNonNull(combiner); 483 class ReducingSink extends Box<R> 484 implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong { 485 @Override 486 public void begin(long size) { 487 state = supplier.get(); 488 } 489 490 @Override 491 public void accept(long t) { 492 accumulator.accept(state, t); 493 } 494 495 @Override 496 public void combine(ReducingSink other) { 497 state = combiner.apply(state, other.state); 498 } 499 } 500 return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) { 501 @Override 502 public ReducingSink makeSink() { 503 return new ReducingSink(); 504 } 505 }; 506 } 507 508 /** 509 * Constructs a {@code TerminalOp} that implements a functional reduce on 510 * {@code double} values. 511 * 512 * @param identity the identity for the combining function 513 * @param operator the combining function 514 * @return a {@code TerminalOp} implementing the reduction 515 */ 516 public static TerminalOp<Double, Double> 517 makeDouble(double identity, DoubleBinaryOperator operator) { 518 Objects.requireNonNull(operator); 519 class ReducingSink 520 implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble { 521 private double state; 522 523 @Override 524 public void begin(long size) { 525 state = identity; 526 } 527 528 @Override 529 public void accept(double t) { 530 state = operator.applyAsDouble(state, t); 531 } 532 533 @Override 534 public Double get() { 535 return state; 536 } 537 538 @Override 539 public void combine(ReducingSink other) { 540 accept(other.state); 541 } 542 } 543 return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) { 544 @Override 545 public ReducingSink makeSink() { 546 return new ReducingSink(); 547 } 548 }; 549 } 550 551 /** 552 * Constructs a {@code TerminalOp} that implements a functional reduce on 553 * {@code double} values, producing an optional double result. 554 * 555 * @param operator the combining function 556 * @return a {@code TerminalOp} implementing the reduction 557 */ 558 public static TerminalOp<Double, OptionalDouble> 559 makeDouble(DoubleBinaryOperator operator) { 560 Objects.requireNonNull(operator); 561 class ReducingSink 562 implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble { 563 private boolean empty; 564 private double state; 565 566 public void begin(long size) { 567 empty = true; 568 state = 0; 569 } 570 571 @Override 572 public void accept(double t) { 573 if (empty) { 574 empty = false; 575 state = t; 576 } 577 else { 578 state = operator.applyAsDouble(state, t); 579 } 580 } 581 582 @Override 583 public OptionalDouble get() { 584 return empty ? OptionalDouble.empty() : OptionalDouble.of(state); 585 } 586 587 @Override 588 public void combine(ReducingSink other) { 589 if (!other.empty) 590 accept(other.state); 591 } 592 } 593 return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) { 594 @Override 595 public ReducingSink makeSink() { 596 return new ReducingSink(); 597 } 598 }; 599 } 600 601 /** 602 * Constructs a {@code TerminalOp} that implements a mutable reduce on 603 * {@code double} values. 604 * 605 * @param <R> the type of the result 606 * @param supplier a factory to produce a new accumulator of the result type 607 * @param accumulator a function to incorporate an int into an 608 * accumulator 609 * @param combiner a function to combine an accumulator into another 610 * @return a {@code TerminalOp} implementing the reduction 611 */ 612 public static <R> TerminalOp<Double, R> 613 makeDouble(Supplier<R> supplier, 614 ObjDoubleConsumer<R> accumulator, 615 BinaryOperator<R> combiner) { 616 Objects.requireNonNull(supplier); 617 Objects.requireNonNull(accumulator); 618 Objects.requireNonNull(combiner); 619 class ReducingSink extends Box<R> 620 implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble { 621 @Override 622 public void begin(long size) { 623 state = supplier.get(); 624 } 625 626 @Override 627 public void accept(double t) { 628 accumulator.accept(state, t); 629 } 630 631 @Override 632 public void combine(ReducingSink other) { 633 state = combiner.apply(state, other.state); 634 } 635 } 636 return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) { 637 @Override 638 public ReducingSink makeSink() { 639 return new ReducingSink(); 640 } 641 }; 642 } 643 644 /** 645 * A type of {@code TerminalSink} that implements an associative reducing 646 * operation on elements of type {@code T} and producing a result of type 647 * {@code R}. 648 * 649 * @param <T> the type of input element to the combining operation 650 * @param <R> the result type 651 * @param <K> the type of the {@code AccumulatingSink}. 652 */ 653 private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>> 654 extends TerminalSink<T, R> { 655 public void combine(K other); 656 } 657 658 /** 659 * State box for a single state element, used as a base class for 660 * {@code AccumulatingSink} instances 661 * 662 * @param <U> The type of the state element 663 */ 664 private static abstract class Box<U> { 665 U state; 666 667 Box() {} // Avoid creation of special accessor 668 669 public U get() { 670 return state; 671 } 672 } 673 674 /** 675 * A {@code TerminalOp} that evaluates a stream pipeline and sends the 676 * output into an {@code AccumulatingSink}, which performs a reduce 677 * operation. The {@code AccumulatingSink} must represent an associative 678 * reducing operation. 679 * 680 * @param <T> the output type of the stream pipeline 681 * @param <R> the result type of the reducing operation 682 * @param <S> the type of the {@code AccumulatingSink} 683 */ 684 private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>> 685 implements TerminalOp<T, R> { 686 private final StreamShape inputShape; 687 688 /** 689 * Create a {@code ReduceOp} of the specified stream shape which uses 690 * the specified {@code Supplier} to create accumulating sinks. 691 * 692 * @param shape The shape of the stream pipeline 693 */ 694 ReduceOp(StreamShape shape) { 695 inputShape = shape; 696 } 697 698 public abstract S makeSink(); 699 700 @Override 701 public StreamShape inputShape() { 702 return inputShape; 703 } 704 705 @Override 706 public <P_IN> R evaluateSequential(PipelineHelper<T> helper, 707 Spliterator<P_IN> spliterator) { 708 return helper.wrapAndCopyInto(makeSink(), spliterator).get(); 709 } 710 711 @Override 712 public <P_IN> R evaluateParallel(PipelineHelper<T> helper, 713 Spliterator<P_IN> spliterator) { 714 return new ReduceTask<>(this, helper, spliterator).invoke().get(); 715 } 716 } 717 718 /** 719 * A {@code ForkJoinTask} for performing a parallel reduce operation. 720 */ 721 @SuppressWarnings("serial") 722 private static final class ReduceTask<P_IN, P_OUT, R, 723 S extends AccumulatingSink<P_OUT, R, S>> 724 extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> { 725 private final ReduceOp<P_OUT, R, S> op; 726 727 ReduceTask(ReduceOp<P_OUT, R, S> op, 728 PipelineHelper<P_OUT> helper, 729 Spliterator<P_IN> spliterator) { 730 super(helper, spliterator); 731 this.op = op; 732 } 733 734 ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent, 735 Spliterator<P_IN> spliterator) { 736 super(parent, spliterator); 737 this.op = parent.op; 738 } 739 740 @Override 741 protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) { 742 return new ReduceTask<>(this, spliterator); 743 } 744 745 @Override 746 protected S doLeaf() { 747 return helper.wrapAndCopyInto(op.makeSink(), spliterator); 748 } 749 750 @Override 751 public void onCompletion(CountedCompleter<?> caller) { 752 if (!isLeaf()) { 753 S leftResult = leftChild.getLocalResult(); 754 leftResult.combine(rightChild.getLocalResult()); 755 setLocalResult(leftResult); 756 } 757 // GC spliterator, left and right child 758 super.onCompletion(caller); 759 } 760 } 761 } 762