1 /* 2 * Copyright (c) 2012, 2021, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Objects; 28 import java.util.Optional; 29 import java.util.OptionalDouble; 30 import java.util.OptionalInt; 31 import java.util.OptionalLong; 32 import java.util.Spliterator; 33 import java.util.concurrent.CountedCompleter; 34 import java.util.function.BiConsumer; 35 import java.util.function.BiFunction; 36 import java.util.function.BinaryOperator; 37 import java.util.function.DoubleBinaryOperator; 38 import java.util.function.IntBinaryOperator; 39 import java.util.function.LongBinaryOperator; 40 import java.util.function.ObjDoubleConsumer; 41 import java.util.function.ObjIntConsumer; 42 import java.util.function.ObjLongConsumer; 43 import java.util.function.Supplier; 44 45 /** 46 * Factory for creating instances of {@code TerminalOp} that implement 47 * reductions. 48 * 49 * @since 1.8 50 */ 51 final class ReduceOps { 52 ReduceOps()53 private ReduceOps() { } 54 55 /** 56 * Constructs a {@code TerminalOp} that implements a functional reduce on 57 * reference values. 58 * 59 * @param <T> the type of the input elements 60 * @param <U> the type of the result 61 * @param seed the identity element for the reduction 62 * @param reducer the accumulating function that incorporates an additional 63 * input element into the result 64 * @param combiner the combining function that combines two intermediate 65 * results 66 * @return a {@code TerminalOp} implementing the reduction 67 */ 68 public static <T, U> TerminalOp<T, U> makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner)69 makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) { 70 Objects.requireNonNull(reducer); 71 Objects.requireNonNull(combiner); 72 class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> { 73 @Override 74 public void begin(long size) { 75 state = seed; 76 } 77 78 @Override 79 public void accept(T t) { 80 state = reducer.apply(state, t); 81 } 82 83 @Override 84 public void combine(ReducingSink other) { 85 state = combiner.apply(state, other.state); 86 } 87 } 88 return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) { 89 @Override 90 public ReducingSink makeSink() { 91 return new ReducingSink(); 92 } 93 }; 94 } 95 96 /** 97 * Constructs a {@code TerminalOp} that implements a functional reduce on 98 * reference values producing an optional reference result. 99 * 100 * @param <T> The type of the input elements, and the type of the result 101 * @param operator The reducing function 102 * @return A {@code TerminalOp} implementing the reduction 103 */ 104 public static <T> TerminalOp<T, Optional<T>> 105 makeRef(BinaryOperator<T> operator) { 106 Objects.requireNonNull(operator); 107 class ReducingSink 108 implements AccumulatingSink<T, Optional<T>, ReducingSink> { 109 private boolean empty; 110 private T state; 111 112 public void begin(long size) { 113 empty = true; 114 state = null; 115 } 116 117 @Override 118 public void accept(T t) { 119 if (empty) { 120 empty = false; 121 state = t; 122 } else { 123 state = operator.apply(state, t); 124 } 125 } 126 127 @Override 128 public Optional<T> get() { 129 return empty ? Optional.empty() : Optional.of(state); 130 } 131 132 @Override 133 public void combine(ReducingSink other) { 134 if (!other.empty) 135 accept(other.state); 136 } 137 } 138 return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) { 139 @Override 140 public ReducingSink makeSink() { 141 return new ReducingSink(); 142 } 143 }; 144 } 145 146 /** 147 * Constructs a {@code TerminalOp} that implements a mutable reduce on 148 * reference values. 149 * 150 * @param <T> the type of the input elements 151 * @param <I> the type of the intermediate reduction result 152 * @param collector a {@code Collector} defining the reduction 153 * @return a {@code ReduceOp} implementing the reduction 154 */ 155 public static <T, I> TerminalOp<T, I> 156 makeRef(Collector<? super T, I, ?> collector) { 157 Supplier<I> supplier = Objects.requireNonNull(collector).supplier(); 158 BiConsumer<I, ? super T> accumulator = collector.accumulator(); 159 BinaryOperator<I> combiner = collector.combiner(); 160 class ReducingSink extends Box<I> 161 implements AccumulatingSink<T, I, ReducingSink> { 162 @Override 163 public void begin(long size) { 164 state = supplier.get(); 165 } 166 167 @Override 168 public void accept(T t) { 169 accumulator.accept(state, t); 170 } 171 172 @Override 173 public void combine(ReducingSink other) { 174 state = combiner.apply(state, other.state); 175 } 176 } 177 return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) { 178 @Override 179 public ReducingSink makeSink() { 180 return new ReducingSink(); 181 } 182 183 @Override 184 public int getOpFlags() { 185 return collector.characteristics().contains(Collector.Characteristics.UNORDERED) 186 ? StreamOpFlag.NOT_ORDERED 187 : 0; 188 } 189 }; 190 } 191 192 /** 193 * Constructs a {@code TerminalOp} that implements a mutable reduce on 194 * reference values. 195 * 196 * @param <T> the type of the input elements 197 * @param <R> the type of the result 198 * @param seedFactory a factory to produce a new base accumulator 199 * @param accumulator a function to incorporate an element into an 200 * accumulator 201 * @param reducer a function to combine an accumulator into another 202 * @return a {@code TerminalOp} implementing the reduction 203 */ 204 public static <T, R> TerminalOp<T, R> 205 makeRef(Supplier<R> seedFactory, 206 BiConsumer<R, ? super T> accumulator, 207 BiConsumer<R,R> reducer) { 208 Objects.requireNonNull(seedFactory); 209 Objects.requireNonNull(accumulator); 210 Objects.requireNonNull(reducer); 211 class ReducingSink extends Box<R> 212 implements AccumulatingSink<T, R, ReducingSink> { 213 @Override 214 public void begin(long size) { 215 state = seedFactory.get(); 216 } 217 218 @Override 219 public void accept(T t) { 220 accumulator.accept(state, t); 221 } 222 223 @Override 224 public void combine(ReducingSink other) { 225 reducer.accept(state, other.state); 226 } 227 } 228 return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) { 229 @Override 230 public ReducingSink makeSink() { 231 return new ReducingSink(); 232 } 233 }; 234 } 235 236 /** 237 * Constructs a {@code TerminalOp} that counts the number of stream 238 * elements. If the size of the pipeline is known then count is the size 239 * and there is no need to evaluate the pipeline. If the size of the 240 * pipeline is non known then count is produced, via reduction, using a 241 * {@link CountingSink}. 242 * 243 * @param <T> the type of the input elements 244 * @return a {@code TerminalOp} implementing the counting 245 */ 246 public static <T> TerminalOp<T, Long> 247 makeRefCounting() { 248 return new ReduceOp<T, Long, CountingSink<T>>(StreamShape.REFERENCE) { 249 @Override 250 public CountingSink<T> makeSink() { return new CountingSink.OfRef<>(); } 251 252 @Override 253 public <P_IN> Long evaluateSequential(PipelineHelper<T> helper, 254 Spliterator<P_IN> spliterator) { 255 long size = helper.exactOutputSizeIfKnown(spliterator); 256 if (size != -1) 257 return size; 258 return super.evaluateSequential(helper, spliterator); 259 } 260 261 @Override 262 public <P_IN> Long evaluateParallel(PipelineHelper<T> helper, 263 Spliterator<P_IN> spliterator) { 264 long size = helper.exactOutputSizeIfKnown(spliterator); 265 if (size != -1) 266 return size; 267 return super.evaluateParallel(helper, spliterator); 268 } 269 270 @Override 271 public int getOpFlags() { 272 return StreamOpFlag.NOT_ORDERED; 273 } 274 }; 275 } 276 277 /** 278 * Constructs a {@code TerminalOp} that implements a functional reduce on 279 * {@code int} values. 280 * 281 * @param identity the identity for the combining function 282 * @param operator the combining function 283 * @return a {@code TerminalOp} implementing the reduction 284 */ 285 public static TerminalOp<Integer, Integer> 286 makeInt(int identity, IntBinaryOperator operator) { 287 Objects.requireNonNull(operator); 288 class ReducingSink 289 implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt { 290 private int state; 291 292 @Override 293 public void begin(long size) { 294 state = identity; 295 } 296 297 @Override 298 public void accept(int t) { 299 state = operator.applyAsInt(state, t); 300 } 301 302 @Override 303 public Integer get() { 304 return state; 305 } 306 307 @Override 308 public void combine(ReducingSink other) { 309 accept(other.state); 310 } 311 } 312 return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) { 313 @Override 314 public ReducingSink makeSink() { 315 return new ReducingSink(); 316 } 317 }; 318 } 319 320 /** 321 * Constructs a {@code TerminalOp} that implements a functional reduce on 322 * {@code int} values, producing an optional integer result. 323 * 324 * @param operator the combining function 325 * @return a {@code TerminalOp} implementing the reduction 326 */ 327 public static TerminalOp<Integer, OptionalInt> 328 makeInt(IntBinaryOperator operator) { 329 Objects.requireNonNull(operator); 330 class ReducingSink 331 implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt { 332 private boolean empty; 333 private int state; 334 335 public void begin(long size) { 336 empty = true; 337 state = 0; 338 } 339 340 @Override 341 public void accept(int t) { 342 if (empty) { 343 empty = false; 344 state = t; 345 } 346 else { 347 state = operator.applyAsInt(state, t); 348 } 349 } 350 351 @Override 352 public OptionalInt get() { 353 return empty ? OptionalInt.empty() : OptionalInt.of(state); 354 } 355 356 @Override 357 public void combine(ReducingSink other) { 358 if (!other.empty) 359 accept(other.state); 360 } 361 } 362 return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) { 363 @Override 364 public ReducingSink makeSink() { 365 return new ReducingSink(); 366 } 367 }; 368 } 369 370 /** 371 * Constructs a {@code TerminalOp} that implements a mutable reduce on 372 * {@code int} values. 373 * 374 * @param <R> The type of the result 375 * @param supplier a factory to produce a new accumulator of the result type 376 * @param accumulator a function to incorporate an int into an 377 * accumulator 378 * @param combiner a function to combine an accumulator into another 379 * @return A {@code ReduceOp} implementing the reduction 380 */ 381 public static <R> TerminalOp<Integer, R> 382 makeInt(Supplier<R> supplier, 383 ObjIntConsumer<R> accumulator, 384 BinaryOperator<R> combiner) { 385 Objects.requireNonNull(supplier); 386 Objects.requireNonNull(accumulator); 387 Objects.requireNonNull(combiner); 388 class ReducingSink extends Box<R> 389 implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt { 390 @Override 391 public void begin(long size) { 392 state = supplier.get(); 393 } 394 395 @Override 396 public void accept(int t) { 397 accumulator.accept(state, t); 398 } 399 400 @Override 401 public void combine(ReducingSink other) { 402 state = combiner.apply(state, other.state); 403 } 404 } 405 return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) { 406 @Override 407 public ReducingSink makeSink() { 408 return new ReducingSink(); 409 } 410 }; 411 } 412 413 /** 414 * Constructs a {@code TerminalOp} that counts the number of stream 415 * elements. If the size of the pipeline is known then count is the size 416 * and there is no need to evaluate the pipeline. If the size of the 417 * pipeline is non known then count is produced, via reduction, using a 418 * {@link CountingSink}. 419 * 420 * @return a {@code TerminalOp} implementing the counting 421 */ 422 public static TerminalOp<Integer, Long> 423 makeIntCounting() { 424 return new ReduceOp<Integer, Long, CountingSink<Integer>>(StreamShape.INT_VALUE) { 425 @Override 426 public CountingSink<Integer> makeSink() { return new CountingSink.OfInt(); } 427 428 @Override 429 public <P_IN> Long evaluateSequential(PipelineHelper<Integer> helper, 430 Spliterator<P_IN> spliterator) { 431 long size = helper.exactOutputSizeIfKnown(spliterator); 432 if (size != -1) 433 return size; 434 return super.evaluateSequential(helper, spliterator); 435 } 436 437 @Override 438 public <P_IN> Long evaluateParallel(PipelineHelper<Integer> helper, 439 Spliterator<P_IN> spliterator) { 440 long size = helper.exactOutputSizeIfKnown(spliterator); 441 if (size != -1) 442 return size; 443 return super.evaluateParallel(helper, spliterator); 444 } 445 446 @Override 447 public int getOpFlags() { 448 return StreamOpFlag.NOT_ORDERED; 449 } 450 }; 451 } 452 453 /** 454 * Constructs a {@code TerminalOp} that implements a functional reduce on 455 * {@code long} values. 456 * 457 * @param identity the identity for the combining function 458 * @param operator the combining function 459 * @return a {@code TerminalOp} implementing the reduction 460 */ 461 public static TerminalOp<Long, Long> 462 makeLong(long identity, LongBinaryOperator operator) { 463 Objects.requireNonNull(operator); 464 class ReducingSink 465 implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong { 466 private long state; 467 468 @Override 469 public void begin(long size) { 470 state = identity; 471 } 472 473 @Override 474 public void accept(long t) { 475 state = operator.applyAsLong(state, t); 476 } 477 478 @Override 479 public Long get() { 480 return state; 481 } 482 483 @Override 484 public void combine(ReducingSink other) { 485 accept(other.state); 486 } 487 } 488 return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) { 489 @Override 490 public ReducingSink makeSink() { 491 return new ReducingSink(); 492 } 493 }; 494 } 495 496 /** 497 * Constructs a {@code TerminalOp} that implements a functional reduce on 498 * {@code long} values, producing an optional long result. 499 * 500 * @param operator the combining function 501 * @return a {@code TerminalOp} implementing the reduction 502 */ 503 public static TerminalOp<Long, OptionalLong> 504 makeLong(LongBinaryOperator operator) { 505 Objects.requireNonNull(operator); 506 class ReducingSink 507 implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong { 508 private boolean empty; 509 private long state; 510 511 public void begin(long size) { 512 empty = true; 513 state = 0; 514 } 515 516 @Override 517 public void accept(long t) { 518 if (empty) { 519 empty = false; 520 state = t; 521 } 522 else { 523 state = operator.applyAsLong(state, t); 524 } 525 } 526 527 @Override 528 public OptionalLong get() { 529 return empty ? OptionalLong.empty() : OptionalLong.of(state); 530 } 531 532 @Override 533 public void combine(ReducingSink other) { 534 if (!other.empty) 535 accept(other.state); 536 } 537 } 538 return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) { 539 @Override 540 public ReducingSink makeSink() { 541 return new ReducingSink(); 542 } 543 }; 544 } 545 546 /** 547 * Constructs a {@code TerminalOp} that implements a mutable reduce on 548 * {@code long} values. 549 * 550 * @param <R> the type of the result 551 * @param supplier a factory to produce a new accumulator of the result type 552 * @param accumulator a function to incorporate an int into an 553 * accumulator 554 * @param combiner a function to combine an accumulator into another 555 * @return a {@code TerminalOp} implementing the reduction 556 */ 557 public static <R> TerminalOp<Long, R> 558 makeLong(Supplier<R> supplier, 559 ObjLongConsumer<R> accumulator, 560 BinaryOperator<R> combiner) { 561 Objects.requireNonNull(supplier); 562 Objects.requireNonNull(accumulator); 563 Objects.requireNonNull(combiner); 564 class ReducingSink extends Box<R> 565 implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong { 566 @Override 567 public void begin(long size) { 568 state = supplier.get(); 569 } 570 571 @Override 572 public void accept(long t) { 573 accumulator.accept(state, t); 574 } 575 576 @Override 577 public void combine(ReducingSink other) { 578 state = combiner.apply(state, other.state); 579 } 580 } 581 return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) { 582 @Override 583 public ReducingSink makeSink() { 584 return new ReducingSink(); 585 } 586 }; 587 } 588 589 /** 590 * Constructs a {@code TerminalOp} that counts the number of stream 591 * elements. If the size of the pipeline is known then count is the size 592 * and there is no need to evaluate the pipeline. If the size of the 593 * pipeline is non known then count is produced, via reduction, using a 594 * {@link CountingSink}. 595 * 596 * @return a {@code TerminalOp} implementing the counting 597 */ 598 public static TerminalOp<Long, Long> 599 makeLongCounting() { 600 return new ReduceOp<Long, Long, CountingSink<Long>>(StreamShape.LONG_VALUE) { 601 @Override 602 public CountingSink<Long> makeSink() { return new CountingSink.OfLong(); } 603 604 @Override 605 public <P_IN> Long evaluateSequential(PipelineHelper<Long> helper, 606 Spliterator<P_IN> spliterator) { 607 long size = helper.exactOutputSizeIfKnown(spliterator); 608 if (size != -1) 609 return size; 610 return super.evaluateSequential(helper, spliterator); 611 } 612 613 @Override 614 public <P_IN> Long evaluateParallel(PipelineHelper<Long> helper, 615 Spliterator<P_IN> spliterator) { 616 long size = helper.exactOutputSizeIfKnown(spliterator); 617 if (size != -1) 618 return size; 619 return super.evaluateParallel(helper, spliterator); 620 } 621 622 @Override 623 public int getOpFlags() { 624 return StreamOpFlag.NOT_ORDERED; 625 } 626 }; 627 } 628 629 /** 630 * Constructs a {@code TerminalOp} that implements a functional reduce on 631 * {@code double} values. 632 * 633 * @param identity the identity for the combining function 634 * @param operator the combining function 635 * @return a {@code TerminalOp} implementing the reduction 636 */ 637 public static TerminalOp<Double, Double> 638 makeDouble(double identity, DoubleBinaryOperator operator) { 639 Objects.requireNonNull(operator); 640 class ReducingSink 641 implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble { 642 private double state; 643 644 @Override 645 public void begin(long size) { 646 state = identity; 647 } 648 649 @Override 650 public void accept(double t) { 651 state = operator.applyAsDouble(state, t); 652 } 653 654 @Override 655 public Double get() { 656 return state; 657 } 658 659 @Override 660 public void combine(ReducingSink other) { 661 accept(other.state); 662 } 663 } 664 return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) { 665 @Override 666 public ReducingSink makeSink() { 667 return new ReducingSink(); 668 } 669 }; 670 } 671 672 /** 673 * Constructs a {@code TerminalOp} that implements a functional reduce on 674 * {@code double} values, producing an optional double result. 675 * 676 * @param operator the combining function 677 * @return a {@code TerminalOp} implementing the reduction 678 */ 679 public static TerminalOp<Double, OptionalDouble> 680 makeDouble(DoubleBinaryOperator operator) { 681 Objects.requireNonNull(operator); 682 class ReducingSink 683 implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble { 684 private boolean empty; 685 private double state; 686 687 public void begin(long size) { 688 empty = true; 689 state = 0; 690 } 691 692 @Override 693 public void accept(double t) { 694 if (empty) { 695 empty = false; 696 state = t; 697 } 698 else { 699 state = operator.applyAsDouble(state, t); 700 } 701 } 702 703 @Override 704 public OptionalDouble get() { 705 return empty ? OptionalDouble.empty() : OptionalDouble.of(state); 706 } 707 708 @Override 709 public void combine(ReducingSink other) { 710 if (!other.empty) 711 accept(other.state); 712 } 713 } 714 return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) { 715 @Override 716 public ReducingSink makeSink() { 717 return new ReducingSink(); 718 } 719 }; 720 } 721 722 /** 723 * Constructs a {@code TerminalOp} that implements a mutable reduce on 724 * {@code double} values. 725 * 726 * @param <R> the type of the result 727 * @param supplier a factory to produce a new accumulator of the result type 728 * @param accumulator a function to incorporate an int into an 729 * accumulator 730 * @param combiner a function to combine an accumulator into another 731 * @return a {@code TerminalOp} implementing the reduction 732 */ 733 public static <R> TerminalOp<Double, R> 734 makeDouble(Supplier<R> supplier, 735 ObjDoubleConsumer<R> accumulator, 736 BinaryOperator<R> combiner) { 737 Objects.requireNonNull(supplier); 738 Objects.requireNonNull(accumulator); 739 Objects.requireNonNull(combiner); 740 class ReducingSink extends Box<R> 741 implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble { 742 @Override 743 public void begin(long size) { 744 state = supplier.get(); 745 } 746 747 @Override 748 public void accept(double t) { 749 accumulator.accept(state, t); 750 } 751 752 @Override 753 public void combine(ReducingSink other) { 754 state = combiner.apply(state, other.state); 755 } 756 } 757 return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) { 758 @Override 759 public ReducingSink makeSink() { 760 return new ReducingSink(); 761 } 762 }; 763 } 764 765 /** 766 * Constructs a {@code TerminalOp} that counts the number of stream 767 * elements. If the size of the pipeline is known then count is the size 768 * and there is no need to evaluate the pipeline. If the size of the 769 * pipeline is non known then count is produced, via reduction, using a 770 * {@link CountingSink}. 771 * 772 * @return a {@code TerminalOp} implementing the counting 773 */ 774 public static TerminalOp<Double, Long> 775 makeDoubleCounting() { 776 return new ReduceOp<Double, Long, CountingSink<Double>>(StreamShape.DOUBLE_VALUE) { 777 @Override 778 public CountingSink<Double> makeSink() { return new CountingSink.OfDouble(); } 779 780 @Override 781 public <P_IN> Long evaluateSequential(PipelineHelper<Double> helper, 782 Spliterator<P_IN> spliterator) { 783 long size = helper.exactOutputSizeIfKnown(spliterator); 784 if (size != -1) 785 return size; 786 return super.evaluateSequential(helper, spliterator); 787 } 788 789 @Override 790 public <P_IN> Long evaluateParallel(PipelineHelper<Double> helper, 791 Spliterator<P_IN> spliterator) { 792 long size = helper.exactOutputSizeIfKnown(spliterator); 793 if (size != -1) 794 return size; 795 return super.evaluateParallel(helper, spliterator); 796 } 797 798 @Override 799 public int getOpFlags() { 800 return StreamOpFlag.NOT_ORDERED; 801 } 802 }; 803 } 804 805 /** 806 * A sink that counts elements 807 */ 808 abstract static class CountingSink<T> 809 extends Box<Long> 810 implements AccumulatingSink<T, Long, CountingSink<T>> { 811 long count; 812 813 @Override 814 public void begin(long size) { 815 count = 0L; 816 } 817 818 @Override 819 public Long get() { 820 return count; 821 } 822 823 @Override 824 public void combine(CountingSink<T> other) { 825 count += other.count; 826 } 827 828 static final class OfRef<T> extends CountingSink<T> { 829 @Override 830 public void accept(T t) { 831 count++; 832 } 833 } 834 835 static final class OfInt extends CountingSink<Integer> implements Sink.OfInt { 836 @Override 837 public void accept(int t) { 838 count++; 839 } 840 } 841 842 static final class OfLong extends CountingSink<Long> implements Sink.OfLong { 843 @Override 844 public void accept(long t) { 845 count++; 846 } 847 } 848 849 static final class OfDouble extends CountingSink<Double> implements Sink.OfDouble { 850 @Override 851 public void accept(double t) { 852 count++; 853 } 854 } 855 } 856 857 /** 858 * A type of {@code TerminalSink} that implements an associative reducing 859 * operation on elements of type {@code T} and producing a result of type 860 * {@code R}. 861 * 862 * @param <T> the type of input element to the combining operation 863 * @param <R> the result type 864 * @param <K> the type of the {@code AccumulatingSink}. 865 */ 866 private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>> 867 extends TerminalSink<T, R> { 868 void combine(K other); 869 } 870 871 /** 872 * State box for a single state element, used as a base class for 873 * {@code AccumulatingSink} instances 874 * 875 * @param <U> The type of the state element 876 */ 877 private abstract static class Box<U> { 878 U state; 879 880 Box() {} // Avoid creation of special accessor 881 882 public U get() { 883 return state; 884 } 885 } 886 887 /** 888 * A {@code TerminalOp} that evaluates a stream pipeline and sends the 889 * output into an {@code AccumulatingSink}, which performs a reduce 890 * operation. The {@code AccumulatingSink} must represent an associative 891 * reducing operation. 892 * 893 * @param <T> the output type of the stream pipeline 894 * @param <R> the result type of the reducing operation 895 * @param <S> the type of the {@code AccumulatingSink} 896 */ 897 private abstract static class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>> 898 implements TerminalOp<T, R> { 899 private final StreamShape inputShape; 900 901 /** 902 * Create a {@code ReduceOp} of the specified stream shape which uses 903 * the specified {@code Supplier} to create accumulating sinks. 904 * 905 * @param shape The shape of the stream pipeline 906 */ 907 ReduceOp(StreamShape shape) { 908 inputShape = shape; 909 } 910 911 public abstract S makeSink(); 912 913 @Override 914 public StreamShape inputShape() { 915 return inputShape; 916 } 917 918 @Override 919 public <P_IN> R evaluateSequential(PipelineHelper<T> helper, 920 Spliterator<P_IN> spliterator) { 921 return helper.wrapAndCopyInto(makeSink(), spliterator).get(); 922 } 923 924 @Override 925 public <P_IN> R evaluateParallel(PipelineHelper<T> helper, 926 Spliterator<P_IN> spliterator) { 927 return new ReduceTask<>(this, helper, spliterator).invoke().get(); 928 } 929 } 930 931 /** 932 * A {@code ForkJoinTask} for performing a parallel reduce operation. 933 */ 934 @SuppressWarnings("serial") 935 private static final class ReduceTask<P_IN, P_OUT, R, 936 S extends AccumulatingSink<P_OUT, R, S>> 937 extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> { 938 private final ReduceOp<P_OUT, R, S> op; 939 940 ReduceTask(ReduceOp<P_OUT, R, S> op, 941 PipelineHelper<P_OUT> helper, 942 Spliterator<P_IN> spliterator) { 943 super(helper, spliterator); 944 this.op = op; 945 } 946 947 ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent, 948 Spliterator<P_IN> spliterator) { 949 super(parent, spliterator); 950 this.op = parent.op; 951 } 952 953 @Override 954 protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) { 955 return new ReduceTask<>(this, spliterator); 956 } 957 958 @Override 959 protected S doLeaf() { 960 return helper.wrapAndCopyInto(op.makeSink(), spliterator); 961 } 962 963 @Override 964 public void onCompletion(CountedCompleter<?> caller) { 965 if (!isLeaf()) { 966 S leftResult = leftChild.getLocalResult(); 967 leftResult.combine(rightChild.getLocalResult()); 968 setLocalResult(leftResult); 969 } 970 // GC spliterator, left and right child 971 super.onCompletion(caller); 972 } 973 } 974 } 975