1 /* 2 * Copyright (c) 2012, 2021, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Spliterator; 28 import java.util.concurrent.CountedCompleter; 29 import java.util.function.IntFunction; 30 31 /** 32 * Factory for instances of a short-circuiting stateful intermediate operations 33 * that produce subsequences of their input stream. 34 * 35 * @since 1.8 36 */ 37 final class SliceOps { 38 39 // No instances SliceOps()40 private SliceOps() { } 41 42 /** 43 * Calculates the sliced size given the current size, number of elements 44 * skip, and the number of elements to limit. 45 * 46 * @param size the current size 47 * @param skip the number of elements to skip, assumed to be >= 0 48 * @param limit the number of elements to limit, assumed to be >= 0, with 49 * a value of {@code Long.MAX_VALUE} if there is no limit 50 * @return the sliced size 51 */ calcSize(long size, long skip, long limit)52 private static long calcSize(long size, long skip, long limit) { 53 return size >= 0 ? Math.max(0, Math.min(size - skip, limit)) : -1; 54 } 55 56 /** 57 * Calculates the slice fence, which is one past the index of the slice 58 * range 59 * @param skip the number of elements to skip, assumed to be >= 0 60 * @param limit the number of elements to limit, assumed to be >= 0, with 61 * a value of {@code Long.MAX_VALUE} if there is no limit 62 * @return the slice fence. 63 */ calcSliceFence(long skip, long limit)64 private static long calcSliceFence(long skip, long limit) { 65 long sliceFence = limit >= 0 ? skip + limit : Long.MAX_VALUE; 66 // Check for overflow 67 return (sliceFence >= 0) ? sliceFence : Long.MAX_VALUE; 68 } 69 70 /** 71 * Creates a slice spliterator given a stream shape governing the 72 * spliterator type. Requires that the underlying Spliterator 73 * be SUBSIZED. 74 */ sliceSpliterator(StreamShape shape, Spliterator<P_IN> s, long skip, long limit)75 private static <P_IN> Spliterator<P_IN> sliceSpliterator(StreamShape shape, 76 Spliterator<P_IN> s, 77 long skip, long limit) { 78 assert s.hasCharacteristics(Spliterator.SUBSIZED); 79 long sliceFence = calcSliceFence(skip, limit); 80 @SuppressWarnings("unchecked") 81 Spliterator<P_IN> sliceSpliterator = (Spliterator<P_IN>) switch (shape) { 82 case REFERENCE 83 -> new StreamSpliterators.SliceSpliterator.OfRef<>(s, skip, sliceFence); 84 case INT_VALUE 85 -> new StreamSpliterators.SliceSpliterator.OfInt((Spliterator.OfInt) s, skip, sliceFence); 86 case LONG_VALUE 87 -> new StreamSpliterators.SliceSpliterator.OfLong((Spliterator.OfLong) s, skip, sliceFence); 88 case DOUBLE_VALUE 89 -> new StreamSpliterators.SliceSpliterator.OfDouble((Spliterator.OfDouble) s, skip, sliceFence); 90 }; 91 return sliceSpliterator; 92 } 93 94 /** 95 * Appends a "slice" operation to the provided stream. The slice operation 96 * may be may be skip-only, limit-only, or skip-and-limit. 97 * 98 * @param <T> the type of both input and output elements 99 * @param upstream a reference stream with element type T 100 * @param skip the number of elements to skip. Must be >= 0. 101 * @param limit the maximum size of the resulting stream, or -1 if no limit 102 * is to be imposed 103 */ makeRef(AbstractPipeline<?, T, ?> upstream, long skip, long limit)104 public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream, 105 long skip, long limit) { 106 if (skip < 0) 107 throw new IllegalArgumentException("Skip must be non-negative: " + skip); 108 long normalizedLimit = limit >= 0 ? limit : Long.MAX_VALUE; 109 110 return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE, 111 flags(limit)) { 112 @Override 113 long exactOutputSize(long previousSize) { 114 return calcSize(previousSize, skip, normalizedLimit); 115 } 116 117 Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s, 118 long skip, long limit, long sizeIfKnown) { 119 if (skip <= sizeIfKnown) { 120 // Use just the limit if the number of elements 121 // to skip is <= the known pipeline size 122 limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; 123 skip = 0; 124 } 125 return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit); 126 } 127 128 @Override 129 // Android-changed: Make public, to match the method it's overriding. 130 public <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { 131 long size = helper.exactOutputSizeIfKnown(spliterator); 132 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 133 return new StreamSpliterators.SliceSpliterator.OfRef<>( 134 helper.wrapSpliterator(spliterator), 135 skip, 136 calcSliceFence(skip, limit)); 137 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 138 return unorderedSkipLimitSpliterator( 139 helper.wrapSpliterator(spliterator), 140 skip, limit, size); 141 } 142 else { 143 // @@@ OOMEs will occur for LongStream.range(0, Long.MAX_VALUE).filter(i -> true).limit(n) 144 // when n * parallelismLevel is sufficiently large. 145 // Need to adjust the target size of splitting for the 146 // SliceTask from say (size / k) to say min(size / k, 1 << 14) 147 // This will limit the size of the buffers created at the leaf nodes 148 // cancellation will be more aggressive cancelling later tasks 149 // if the target slice size has been reached from a given task, 150 // cancellation should also clear local results if any 151 return new SliceTask<>(this, helper, spliterator, Nodes.castingArray(), skip, limit). 152 invoke().spliterator(); 153 } 154 } 155 156 @Override 157 // Android-changed: Make public, to match the method it's overriding. 158 public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, 159 Spliterator<P_IN> spliterator, 160 IntFunction<T[]> generator) { 161 long size = helper.exactOutputSizeIfKnown(spliterator); 162 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 163 // Because the pipeline is SIZED the slice spliterator 164 // can be created from the source, this requires matching 165 // to shape of the source, and is potentially more efficient 166 // than creating the slice spliterator from the pipeline 167 // wrapping spliterator 168 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit); 169 return Nodes.collect(helper, s, true, generator); 170 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 171 Spliterator<T> s = unorderedSkipLimitSpliterator( 172 helper.wrapSpliterator(spliterator), 173 skip, limit, size); 174 // Collect using this pipeline, which is empty and therefore 175 // can be used with the pipeline wrapping spliterator 176 // Note that we cannot create a slice spliterator from 177 // the source spliterator if the pipeline is not SIZED 178 return Nodes.collect(this, s, true, generator); 179 } 180 else { 181 return new SliceTask<>(this, helper, spliterator, generator, skip, limit). 182 invoke(); 183 } 184 } 185 186 @Override 187 // Android-changed: Make public, to match the method it's overriding. 188 public Sink<T> opWrapSink(int flags, Sink<T> sink) { 189 return new Sink.ChainedReference<>(sink) { 190 long n = skip; 191 long m = normalizedLimit; 192 193 @Override 194 public void begin(long size) { 195 downstream.begin(calcSize(size, skip, m)); 196 } 197 198 @Override 199 public void accept(T t) { 200 if (n == 0) { 201 if (m > 0) { 202 m--; 203 downstream.accept(t); 204 } 205 } 206 else { 207 n--; 208 } 209 } 210 211 @Override 212 public boolean cancellationRequested() { 213 return m == 0 || downstream.cancellationRequested(); 214 } 215 }; 216 } 217 }; 218 } 219 220 /** 221 * Appends a "slice" operation to the provided IntStream. The slice 222 * operation may be may be skip-only, limit-only, or skip-and-limit. 223 * 224 * @param upstream An IntStream 225 * @param skip The number of elements to skip. Must be >= 0. 226 * @param limit The maximum size of the resulting stream, or -1 if no limit 227 * is to be imposed 228 */ 229 public static IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream, 230 long skip, long limit) { 231 if (skip < 0) 232 throw new IllegalArgumentException("Skip must be non-negative: " + skip); 233 long normalizedLimit = limit >= 0 ? limit : Long.MAX_VALUE; 234 235 return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE, 236 flags(limit)) { 237 @Override 238 long exactOutputSize(long previousSize) { 239 return calcSize(previousSize, skip, normalizedLimit); 240 } 241 242 Spliterator.OfInt unorderedSkipLimitSpliterator( 243 Spliterator.OfInt s, long skip, long limit, long sizeIfKnown) { 244 if (skip <= sizeIfKnown) { 245 // Use just the limit if the number of elements 246 // to skip is <= the known pipeline size 247 limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; 248 skip = 0; 249 } 250 return new StreamSpliterators.UnorderedSliceSpliterator.OfInt(s, skip, limit); 251 } 252 253 @Override 254 // Android-changed: Make public, to match the method it's overriding. 255 public <P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper, 256 Spliterator<P_IN> spliterator) { 257 long size = helper.exactOutputSizeIfKnown(spliterator); 258 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 259 return new StreamSpliterators.SliceSpliterator.OfInt( 260 (Spliterator.OfInt) helper.wrapSpliterator(spliterator), 261 skip, 262 calcSliceFence(skip, limit)); 263 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 264 return unorderedSkipLimitSpliterator( 265 (Spliterator.OfInt) helper.wrapSpliterator(spliterator), 266 skip, limit, size); 267 } 268 else { 269 return new SliceTask<>(this, helper, spliterator, Integer[]::new, skip, limit). 270 invoke().spliterator(); 271 } 272 } 273 274 @Override 275 // Android-changed: Make public, to match the method it's overriding. 276 public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, 277 Spliterator<P_IN> spliterator, 278 IntFunction<Integer[]> generator) { 279 long size = helper.exactOutputSizeIfKnown(spliterator); 280 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 281 // Because the pipeline is SIZED the slice spliterator 282 // can be created from the source, this requires matching 283 // to shape of the source, and is potentially more efficient 284 // than creating the slice spliterator from the pipeline 285 // wrapping spliterator 286 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit); 287 return Nodes.collectInt(helper, s, true); 288 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 289 Spliterator.OfInt s = unorderedSkipLimitSpliterator( 290 (Spliterator.OfInt) helper.wrapSpliterator(spliterator), 291 skip, limit, size); 292 // Collect using this pipeline, which is empty and therefore 293 // can be used with the pipeline wrapping spliterator 294 // Note that we cannot create a slice spliterator from 295 // the source spliterator if the pipeline is not SIZED 296 return Nodes.collectInt(this, s, true); 297 } 298 else { 299 return new SliceTask<>(this, helper, spliterator, generator, skip, limit). 300 invoke(); 301 } 302 } 303 304 @Override 305 // Android-changed: Make public, to match the method it's overriding. 306 public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { 307 return new Sink.ChainedInt<>(sink) { 308 long n = skip; 309 long m = normalizedLimit; 310 311 @Override 312 public void begin(long size) { 313 downstream.begin(calcSize(size, skip, m)); 314 } 315 316 @Override 317 public void accept(int t) { 318 if (n == 0) { 319 if (m > 0) { 320 m--; 321 downstream.accept(t); 322 } 323 } 324 else { 325 n--; 326 } 327 } 328 329 @Override 330 public boolean cancellationRequested() { 331 return m == 0 || downstream.cancellationRequested(); 332 } 333 }; 334 } 335 }; 336 } 337 338 /** 339 * Appends a "slice" operation to the provided LongStream. The slice 340 * operation may be may be skip-only, limit-only, or skip-and-limit. 341 * 342 * @param upstream A LongStream 343 * @param skip The number of elements to skip. Must be >= 0. 344 * @param limit The maximum size of the resulting stream, or -1 if no limit 345 * is to be imposed 346 */ 347 public static LongStream makeLong(AbstractPipeline<?, Long, ?> upstream, 348 long skip, long limit) { 349 if (skip < 0) 350 throw new IllegalArgumentException("Skip must be non-negative: " + skip); 351 long normalizedLimit = limit >= 0 ? limit : Long.MAX_VALUE; 352 353 return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE, 354 flags(limit)) { 355 @Override 356 long exactOutputSize(long previousSize) { 357 return calcSize(previousSize, skip, normalizedLimit); 358 } 359 360 Spliterator.OfLong unorderedSkipLimitSpliterator( 361 Spliterator.OfLong s, long skip, long limit, long sizeIfKnown) { 362 if (skip <= sizeIfKnown) { 363 // Use just the limit if the number of elements 364 // to skip is <= the known pipeline size 365 limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; 366 skip = 0; 367 } 368 return new StreamSpliterators.UnorderedSliceSpliterator.OfLong(s, skip, limit); 369 } 370 371 @Override 372 // Android-changed: Make public, to match the method it's overriding. 373 public <P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper, 374 Spliterator<P_IN> spliterator) { 375 long size = helper.exactOutputSizeIfKnown(spliterator); 376 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 377 return new StreamSpliterators.SliceSpliterator.OfLong( 378 (Spliterator.OfLong) helper.wrapSpliterator(spliterator), 379 skip, 380 calcSliceFence(skip, limit)); 381 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 382 return unorderedSkipLimitSpliterator( 383 (Spliterator.OfLong) helper.wrapSpliterator(spliterator), 384 skip, limit, size); 385 } 386 else { 387 return new SliceTask<>(this, helper, spliterator, Long[]::new, skip, limit). 388 invoke().spliterator(); 389 } 390 } 391 392 @Override 393 // Android-changed: Make public, to match the method it's overriding. 394 public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper, 395 Spliterator<P_IN> spliterator, 396 IntFunction<Long[]> generator) { 397 long size = helper.exactOutputSizeIfKnown(spliterator); 398 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 399 // Because the pipeline is SIZED the slice spliterator 400 // can be created from the source, this requires matching 401 // to shape of the source, and is potentially more efficient 402 // than creating the slice spliterator from the pipeline 403 // wrapping spliterator 404 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit); 405 return Nodes.collectLong(helper, s, true); 406 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 407 Spliterator.OfLong s = unorderedSkipLimitSpliterator( 408 (Spliterator.OfLong) helper.wrapSpliterator(spliterator), 409 skip, limit, size); 410 // Collect using this pipeline, which is empty and therefore 411 // can be used with the pipeline wrapping spliterator 412 // Note that we cannot create a slice spliterator from 413 // the source spliterator if the pipeline is not SIZED 414 return Nodes.collectLong(this, s, true); 415 } 416 else { 417 return new SliceTask<>(this, helper, spliterator, generator, skip, limit). 418 invoke(); 419 } 420 } 421 422 @Override 423 // Android-changed: Make public, to match the method it's overriding. 424 public Sink<Long> opWrapSink(int flags, Sink<Long> sink) { 425 return new Sink.ChainedLong<>(sink) { 426 long n = skip; 427 long m = normalizedLimit; 428 429 @Override 430 public void begin(long size) { 431 downstream.begin(calcSize(size, skip, m)); 432 } 433 434 @Override 435 public void accept(long t) { 436 if (n == 0) { 437 if (m > 0) { 438 m--; 439 downstream.accept(t); 440 } 441 } 442 else { 443 n--; 444 } 445 } 446 447 @Override 448 public boolean cancellationRequested() { 449 return m == 0 || downstream.cancellationRequested(); 450 } 451 }; 452 } 453 }; 454 } 455 456 /** 457 * Appends a "slice" operation to the provided DoubleStream. The slice 458 * operation may be may be skip-only, limit-only, or skip-and-limit. 459 * 460 * @param upstream A DoubleStream 461 * @param skip The number of elements to skip. Must be >= 0. 462 * @param limit The maximum size of the resulting stream, or -1 if no limit 463 * is to be imposed 464 */ 465 public static DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream, 466 long skip, long limit) { 467 if (skip < 0) 468 throw new IllegalArgumentException("Skip must be non-negative: " + skip); 469 long normalizedLimit = limit >= 0 ? limit : Long.MAX_VALUE; 470 471 return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE, 472 flags(limit)) { 473 @Override 474 long exactOutputSize(long previousSize) { 475 return calcSize(previousSize, skip, normalizedLimit); 476 } 477 478 Spliterator.OfDouble unorderedSkipLimitSpliterator( 479 Spliterator.OfDouble s, long skip, long limit, long sizeIfKnown) { 480 if (skip <= sizeIfKnown) { 481 // Use just the limit if the number of elements 482 // to skip is <= the known pipeline size 483 limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip; 484 skip = 0; 485 } 486 return new StreamSpliterators.UnorderedSliceSpliterator.OfDouble(s, skip, limit); 487 } 488 489 @Override 490 // Android-changed: Make public, to match the method it's overriding. 491 public <P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper, 492 Spliterator<P_IN> spliterator) { 493 long size = helper.exactOutputSizeIfKnown(spliterator); 494 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 495 return new StreamSpliterators.SliceSpliterator.OfDouble( 496 (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), 497 skip, 498 calcSliceFence(skip, limit)); 499 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 500 return unorderedSkipLimitSpliterator( 501 (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), 502 skip, limit, size); 503 } 504 else { 505 return new SliceTask<>(this, helper, spliterator, Double[]::new, skip, limit). 506 invoke().spliterator(); 507 } 508 } 509 510 @Override 511 // Android-changed: Make public, to match the method it's overriding. 512 public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, 513 Spliterator<P_IN> spliterator, 514 IntFunction<Double[]> generator) { 515 long size = helper.exactOutputSizeIfKnown(spliterator); 516 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { 517 // Because the pipeline is SIZED the slice spliterator 518 // can be created from the source, this requires matching 519 // to shape of the source, and is potentially more efficient 520 // than creating the slice spliterator from the pipeline 521 // wrapping spliterator 522 Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit); 523 return Nodes.collectDouble(helper, s, true); 524 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { 525 Spliterator.OfDouble s = unorderedSkipLimitSpliterator( 526 (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), 527 skip, limit, size); 528 // Collect using this pipeline, which is empty and therefore 529 // can be used with the pipeline wrapping spliterator 530 // Note that we cannot create a slice spliterator from 531 // the source spliterator if the pipeline is not SIZED 532 return Nodes.collectDouble(this, s, true); 533 } 534 else { 535 return new SliceTask<>(this, helper, spliterator, generator, skip, limit). 536 invoke(); 537 } 538 } 539 540 @Override 541 // Android-changed: Make public, to match the method it's overriding. 542 public Sink<Double> opWrapSink(int flags, Sink<Double> sink) { 543 return new Sink.ChainedDouble<>(sink) { 544 long n = skip; 545 long m = normalizedLimit; 546 547 @Override 548 public void begin(long size) { 549 downstream.begin(calcSize(size, skip, m)); 550 } 551 552 @Override 553 public void accept(double t) { 554 if (n == 0) { 555 if (m > 0) { 556 m--; 557 downstream.accept(t); 558 } 559 } 560 else { 561 n--; 562 } 563 } 564 565 @Override 566 public boolean cancellationRequested() { 567 return m == 0 || downstream.cancellationRequested(); 568 } 569 }; 570 } 571 }; 572 } 573 574 private static int flags(long limit) { 575 return StreamOpFlag.IS_SIZE_ADJUSTING | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0); 576 } 577 578 /** 579 * {@code ForkJoinTask} implementing slice computation. 580 * 581 * @param <P_IN> Input element type to the stream pipeline 582 * @param <P_OUT> Output element type from the stream pipeline 583 */ 584 @SuppressWarnings("serial") 585 private static final class SliceTask<P_IN, P_OUT> 586 extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, SliceTask<P_IN, P_OUT>> { 587 private final AbstractPipeline<P_OUT, P_OUT, ?> op; 588 private final IntFunction<P_OUT[]> generator; 589 private final long targetOffset, targetSize; 590 private long thisNodeSize; 591 592 private volatile boolean completed; 593 594 SliceTask(AbstractPipeline<P_OUT, P_OUT, ?> op, 595 PipelineHelper<P_OUT> helper, 596 Spliterator<P_IN> spliterator, 597 IntFunction<P_OUT[]> generator, 598 long offset, long size) { 599 super(helper, spliterator); 600 this.op = op; 601 this.generator = generator; 602 this.targetOffset = offset; 603 this.targetSize = size; 604 } 605 606 SliceTask(SliceTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) { 607 super(parent, spliterator); 608 this.op = parent.op; 609 this.generator = parent.generator; 610 this.targetOffset = parent.targetOffset; 611 this.targetSize = parent.targetSize; 612 } 613 614 @Override 615 protected SliceTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) { 616 return new SliceTask<>(this, spliterator); 617 } 618 619 @Override 620 protected final Node<P_OUT> getEmptyResult() { 621 return Nodes.emptyNode(op.getOutputShape()); 622 } 623 624 @Override 625 protected final Node<P_OUT> doLeaf() { 626 if (isRoot()) { 627 long sizeIfKnown = StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags) 628 ? op.exactOutputSizeIfKnown(spliterator) 629 : -1; 630 final Node.Builder<P_OUT> nb = op.makeNodeBuilder(sizeIfKnown, generator); 631 Sink<P_OUT> opSink = op.opWrapSink(helper.getStreamAndOpFlags(), nb); 632 helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator); 633 // There is no need to truncate since the op performs the 634 // skipping and limiting of elements 635 return nb.build(); 636 } 637 else { 638 final Node.Builder<P_OUT> nb = op.makeNodeBuilder(-1, generator); 639 if (targetOffset == 0) { // limit only 640 Sink<P_OUT> opSink = op.opWrapSink(helper.getStreamAndOpFlags(), nb); 641 helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator); 642 } 643 else { 644 helper.wrapAndCopyInto(nb, spliterator); 645 } 646 Node<P_OUT> node = nb.build(); 647 thisNodeSize = node.count(); 648 completed = true; 649 spliterator = null; 650 return node; 651 } 652 } 653 654 @Override 655 public final void onCompletion(CountedCompleter<?> caller) { 656 if (!isLeaf()) { 657 Node<P_OUT> result; 658 thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize; 659 if (canceled) { 660 thisNodeSize = 0; 661 result = getEmptyResult(); 662 } 663 else if (thisNodeSize == 0) 664 result = getEmptyResult(); 665 else if (leftChild.thisNodeSize == 0) 666 result = rightChild.getLocalResult(); 667 else { 668 result = Nodes.conc(op.getOutputShape(), 669 leftChild.getLocalResult(), rightChild.getLocalResult()); 670 } 671 setLocalResult(isRoot() ? doTruncate(result) : result); 672 completed = true; 673 } 674 if (targetSize >= 0 675 && !isRoot() 676 && isLeftCompleted(targetOffset + targetSize)) 677 cancelLaterNodes(); 678 679 super.onCompletion(caller); 680 } 681 682 @Override 683 protected void cancel() { 684 super.cancel(); 685 if (completed) 686 setLocalResult(getEmptyResult()); 687 } 688 689 private Node<P_OUT> doTruncate(Node<P_OUT> input) { 690 long to = targetSize >= 0 ? Math.min(input.count(), targetOffset + targetSize) : thisNodeSize; 691 return input.truncate(targetOffset, to, generator); 692 } 693 694 /** 695 * Determine if the number of completed elements in this node and nodes 696 * to the left of this node is greater than or equal to the target size. 697 * 698 * @param target the target size 699 * @return true if the number of elements is greater than or equal to 700 * the target size, otherwise false. 701 */ 702 private boolean isLeftCompleted(long target) { 703 long size = completed ? thisNodeSize : completedSize(target); 704 if (size >= target) 705 return true; 706 for (SliceTask<P_IN, P_OUT> parent = getParent(), node = this; 707 parent != null; 708 node = parent, parent = parent.getParent()) { 709 if (node == parent.rightChild) { 710 SliceTask<P_IN, P_OUT> left = parent.leftChild; 711 if (left != null) { 712 size += left.completedSize(target); 713 if (size >= target) 714 return true; 715 } 716 } 717 } 718 return size >= target; 719 } 720 721 /** 722 * Compute the number of completed elements in this node. 723 * <p> 724 * Computation terminates if all nodes have been processed or the 725 * number of completed elements is greater than or equal to the target 726 * size. 727 * 728 * @param target the target size 729 * @return the number of completed elements 730 */ 731 private long completedSize(long target) { 732 if (completed) 733 return thisNodeSize; 734 else { 735 SliceTask<P_IN, P_OUT> left = leftChild; 736 SliceTask<P_IN, P_OUT> right = rightChild; 737 if (left == null || right == null) { 738 // must be completed 739 return thisNodeSize; 740 } 741 else { 742 long leftSize = left.completedSize(target); 743 return (leftSize >= target) ? leftSize : leftSize + right.completedSize(target); 744 } 745 } 746 } 747 } 748 } 749