1 /*
2  * Copyright (c) 2012, 2021, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.Spliterator;
28 import java.util.concurrent.CountedCompleter;
29 import java.util.function.IntFunction;
30 
31 /**
32  * Factory for instances of a short-circuiting stateful intermediate operations
33  * that produce subsequences of their input stream.
34  *
35  * @since 1.8
36  */
37 final class SliceOps {
38 
39     // No instances
SliceOps()40     private SliceOps() { }
41 
42     /**
43      * Calculates the sliced size given the current size, number of elements
44      * skip, and the number of elements to limit.
45      *
46      * @param size the current size
47      * @param skip the number of elements to skip, assumed to be >= 0
48      * @param limit the number of elements to limit, assumed to be >= 0, with
49      *        a value of {@code Long.MAX_VALUE} if there is no limit
50      * @return the sliced size
51      */
calcSize(long size, long skip, long limit)52     private static long calcSize(long size, long skip, long limit) {
53         return size >= 0 ? Math.max(0, Math.min(size - skip, limit)) : -1;
54     }
55 
56     /**
57      * Calculates the slice fence, which is one past the index of the slice
58      * range
59      * @param skip the number of elements to skip, assumed to be >= 0
60      * @param limit the number of elements to limit, assumed to be >= 0, with
61      *        a value of {@code Long.MAX_VALUE} if there is no limit
62      * @return the slice fence.
63      */
calcSliceFence(long skip, long limit)64     private static long calcSliceFence(long skip, long limit) {
65         long sliceFence = limit >= 0 ? skip + limit : Long.MAX_VALUE;
66         // Check for overflow
67         return (sliceFence >= 0) ? sliceFence : Long.MAX_VALUE;
68     }
69 
70     /**
71      * Creates a slice spliterator given a stream shape governing the
72      * spliterator type.  Requires that the underlying Spliterator
73      * be SUBSIZED.
74      */
sliceSpliterator(StreamShape shape, Spliterator<P_IN> s, long skip, long limit)75     private static <P_IN> Spliterator<P_IN> sliceSpliterator(StreamShape shape,
76                                                              Spliterator<P_IN> s,
77                                                              long skip, long limit) {
78         assert s.hasCharacteristics(Spliterator.SUBSIZED);
79         long sliceFence = calcSliceFence(skip, limit);
80         @SuppressWarnings("unchecked")
81         Spliterator<P_IN> sliceSpliterator = (Spliterator<P_IN>) switch (shape) {
82             case REFERENCE
83                 -> new StreamSpliterators.SliceSpliterator.OfRef<>(s, skip, sliceFence);
84             case INT_VALUE
85                 -> new StreamSpliterators.SliceSpliterator.OfInt((Spliterator.OfInt) s, skip, sliceFence);
86             case LONG_VALUE
87                 -> new StreamSpliterators.SliceSpliterator.OfLong((Spliterator.OfLong) s, skip, sliceFence);
88             case DOUBLE_VALUE
89                 -> new StreamSpliterators.SliceSpliterator.OfDouble((Spliterator.OfDouble) s, skip, sliceFence);
90         };
91         return sliceSpliterator;
92     }
93 
94     /**
95      * Appends a "slice" operation to the provided stream.  The slice operation
96      * may be may be skip-only, limit-only, or skip-and-limit.
97      *
98      * @param <T> the type of both input and output elements
99      * @param upstream a reference stream with element type T
100      * @param skip the number of elements to skip.  Must be >= 0.
101      * @param limit the maximum size of the resulting stream, or -1 if no limit
102      *        is to be imposed
103      */
makeRef(AbstractPipeline<?, T, ?> upstream, long skip, long limit)104     public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
105                                         long skip, long limit) {
106         if (skip < 0)
107             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
108         long normalizedLimit = limit >= 0 ? limit : Long.MAX_VALUE;
109 
110         return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
111                                                       flags(limit)) {
112             @Override
113             long exactOutputSize(long previousSize) {
114                 return calcSize(previousSize, skip, normalizedLimit);
115             }
116 
117             Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s,
118                                                          long skip, long limit, long sizeIfKnown) {
119                 if (skip <= sizeIfKnown) {
120                     // Use just the limit if the number of elements
121                     // to skip is <= the known pipeline size
122                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
123                     skip = 0;
124                 }
125                 return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit);
126             }
127 
128             @Override
129             // Android-changed: Make public, to match the method it's overriding.
130             public <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
131                 long size = helper.exactOutputSizeIfKnown(spliterator);
132                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
133                     return new StreamSpliterators.SliceSpliterator.OfRef<>(
134                             helper.wrapSpliterator(spliterator),
135                             skip,
136                             calcSliceFence(skip, limit));
137                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
138                     return unorderedSkipLimitSpliterator(
139                             helper.wrapSpliterator(spliterator),
140                             skip, limit, size);
141                 }
142                 else {
143                     // @@@ OOMEs will occur for LongStream.range(0, Long.MAX_VALUE).filter(i -> true).limit(n)
144                     //     when n * parallelismLevel is sufficiently large.
145                     //     Need to adjust the target size of splitting for the
146                     //     SliceTask from say (size / k) to say min(size / k, 1 << 14)
147                     //     This will limit the size of the buffers created at the leaf nodes
148                     //     cancellation will be more aggressive cancelling later tasks
149                     //     if the target slice size has been reached from a given task,
150                     //     cancellation should also clear local results if any
151                     return new SliceTask<>(this, helper, spliterator, Nodes.castingArray(), skip, limit).
152                             invoke().spliterator();
153                 }
154             }
155 
156             @Override
157             // Android-changed: Make public, to match the method it's overriding.
158             public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
159                                               Spliterator<P_IN> spliterator,
160                                               IntFunction<T[]> generator) {
161                 long size = helper.exactOutputSizeIfKnown(spliterator);
162                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
163                     // Because the pipeline is SIZED the slice spliterator
164                     // can be created from the source, this requires matching
165                     // to shape of the source, and is potentially more efficient
166                     // than creating the slice spliterator from the pipeline
167                     // wrapping spliterator
168                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
169                     return Nodes.collect(helper, s, true, generator);
170                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
171                     Spliterator<T> s =  unorderedSkipLimitSpliterator(
172                             helper.wrapSpliterator(spliterator),
173                             skip, limit, size);
174                     // Collect using this pipeline, which is empty and therefore
175                     // can be used with the pipeline wrapping spliterator
176                     // Note that we cannot create a slice spliterator from
177                     // the source spliterator if the pipeline is not SIZED
178                     return Nodes.collect(this, s, true, generator);
179                 }
180                 else {
181                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
182                             invoke();
183                 }
184             }
185 
186             @Override
187             // Android-changed: Make public, to match the method it's overriding.
188             public Sink<T> opWrapSink(int flags, Sink<T> sink) {
189                 return new Sink.ChainedReference<>(sink) {
190                     long n = skip;
191                     long m = normalizedLimit;
192 
193                     @Override
194                     public void begin(long size) {
195                         downstream.begin(calcSize(size, skip, m));
196                     }
197 
198                     @Override
199                     public void accept(T t) {
200                         if (n == 0) {
201                             if (m > 0) {
202                                 m--;
203                                 downstream.accept(t);
204                             }
205                         }
206                         else {
207                             n--;
208                         }
209                     }
210 
211                     @Override
212                     public boolean cancellationRequested() {
213                         return m == 0 || downstream.cancellationRequested();
214                     }
215                 };
216             }
217         };
218     }
219 
220     /**
221      * Appends a "slice" operation to the provided IntStream.  The slice
222      * operation may be may be skip-only, limit-only, or skip-and-limit.
223      *
224      * @param upstream An IntStream
225      * @param skip The number of elements to skip.  Must be >= 0.
226      * @param limit The maximum size of the resulting stream, or -1 if no limit
227      *        is to be imposed
228      */
229     public static IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream,
230                                     long skip, long limit) {
231         if (skip < 0)
232             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
233         long normalizedLimit = limit >= 0 ? limit : Long.MAX_VALUE;
234 
235         return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE,
236                                                    flags(limit)) {
237             @Override
238             long exactOutputSize(long previousSize) {
239                 return calcSize(previousSize, skip, normalizedLimit);
240             }
241 
242             Spliterator.OfInt unorderedSkipLimitSpliterator(
243                     Spliterator.OfInt s, long skip, long limit, long sizeIfKnown) {
244                 if (skip <= sizeIfKnown) {
245                     // Use just the limit if the number of elements
246                     // to skip is <= the known pipeline size
247                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
248                     skip = 0;
249                 }
250                 return new StreamSpliterators.UnorderedSliceSpliterator.OfInt(s, skip, limit);
251             }
252 
253             @Override
254             // Android-changed: Make public, to match the method it's overriding.
255             public <P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper,
256                                                                Spliterator<P_IN> spliterator) {
257                 long size = helper.exactOutputSizeIfKnown(spliterator);
258                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
259                     return new StreamSpliterators.SliceSpliterator.OfInt(
260                             (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
261                             skip,
262                             calcSliceFence(skip, limit));
263                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
264                     return unorderedSkipLimitSpliterator(
265                             (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
266                             skip, limit, size);
267                 }
268                 else {
269                     return new SliceTask<>(this, helper, spliterator, Integer[]::new, skip, limit).
270                             invoke().spliterator();
271                 }
272             }
273 
274             @Override
275             // Android-changed: Make public, to match the method it's overriding.
276             public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
277                                                     Spliterator<P_IN> spliterator,
278                                                     IntFunction<Integer[]> generator) {
279                 long size = helper.exactOutputSizeIfKnown(spliterator);
280                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
281                     // Because the pipeline is SIZED the slice spliterator
282                     // can be created from the source, this requires matching
283                     // to shape of the source, and is potentially more efficient
284                     // than creating the slice spliterator from the pipeline
285                     // wrapping spliterator
286                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
287                     return Nodes.collectInt(helper, s, true);
288                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
289                     Spliterator.OfInt s =  unorderedSkipLimitSpliterator(
290                             (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
291                             skip, limit, size);
292                     // Collect using this pipeline, which is empty and therefore
293                     // can be used with the pipeline wrapping spliterator
294                     // Note that we cannot create a slice spliterator from
295                     // the source spliterator if the pipeline is not SIZED
296                     return Nodes.collectInt(this, s, true);
297                 }
298                 else {
299                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
300                             invoke();
301                 }
302             }
303 
304             @Override
305             // Android-changed: Make public, to match the method it's overriding.
306             public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
307                 return new Sink.ChainedInt<>(sink) {
308                     long n = skip;
309                     long m = normalizedLimit;
310 
311                     @Override
312                     public void begin(long size) {
313                         downstream.begin(calcSize(size, skip, m));
314                     }
315 
316                     @Override
317                     public void accept(int t) {
318                         if (n == 0) {
319                             if (m > 0) {
320                                 m--;
321                                 downstream.accept(t);
322                             }
323                         }
324                         else {
325                             n--;
326                         }
327                     }
328 
329                     @Override
330                     public boolean cancellationRequested() {
331                         return m == 0 || downstream.cancellationRequested();
332                     }
333                 };
334             }
335         };
336     }
337 
338     /**
339      * Appends a "slice" operation to the provided LongStream.  The slice
340      * operation may be may be skip-only, limit-only, or skip-and-limit.
341      *
342      * @param upstream A LongStream
343      * @param skip The number of elements to skip.  Must be >= 0.
344      * @param limit The maximum size of the resulting stream, or -1 if no limit
345      *        is to be imposed
346      */
347     public static LongStream makeLong(AbstractPipeline<?, Long, ?> upstream,
348                                       long skip, long limit) {
349         if (skip < 0)
350             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
351         long normalizedLimit = limit >= 0 ? limit : Long.MAX_VALUE;
352 
353         return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE,
354                                                  flags(limit)) {
355             @Override
356             long exactOutputSize(long previousSize) {
357                 return calcSize(previousSize, skip, normalizedLimit);
358             }
359 
360             Spliterator.OfLong unorderedSkipLimitSpliterator(
361                     Spliterator.OfLong s, long skip, long limit, long sizeIfKnown) {
362                 if (skip <= sizeIfKnown) {
363                     // Use just the limit if the number of elements
364                     // to skip is <= the known pipeline size
365                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
366                     skip = 0;
367                 }
368                 return new StreamSpliterators.UnorderedSliceSpliterator.OfLong(s, skip, limit);
369             }
370 
371             @Override
372             // Android-changed: Make public, to match the method it's overriding.
373             public <P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper,
374                                                             Spliterator<P_IN> spliterator) {
375                 long size = helper.exactOutputSizeIfKnown(spliterator);
376                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
377                     return new StreamSpliterators.SliceSpliterator.OfLong(
378                             (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
379                             skip,
380                             calcSliceFence(skip, limit));
381                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
382                     return unorderedSkipLimitSpliterator(
383                             (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
384                             skip, limit, size);
385                 }
386                 else {
387                     return new SliceTask<>(this, helper, spliterator, Long[]::new, skip, limit).
388                             invoke().spliterator();
389                 }
390             }
391 
392             @Override
393             // Android-changed: Make public, to match the method it's overriding.
394             public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
395                                                  Spliterator<P_IN> spliterator,
396                                                  IntFunction<Long[]> generator) {
397                 long size = helper.exactOutputSizeIfKnown(spliterator);
398                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
399                     // Because the pipeline is SIZED the slice spliterator
400                     // can be created from the source, this requires matching
401                     // to shape of the source, and is potentially more efficient
402                     // than creating the slice spliterator from the pipeline
403                     // wrapping spliterator
404                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
405                     return Nodes.collectLong(helper, s, true);
406                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
407                     Spliterator.OfLong s =  unorderedSkipLimitSpliterator(
408                             (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
409                             skip, limit, size);
410                     // Collect using this pipeline, which is empty and therefore
411                     // can be used with the pipeline wrapping spliterator
412                     // Note that we cannot create a slice spliterator from
413                     // the source spliterator if the pipeline is not SIZED
414                     return Nodes.collectLong(this, s, true);
415                 }
416                 else {
417                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
418                             invoke();
419                 }
420             }
421 
422             @Override
423             // Android-changed: Make public, to match the method it's overriding.
424             public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
425                 return new Sink.ChainedLong<>(sink) {
426                     long n = skip;
427                     long m = normalizedLimit;
428 
429                     @Override
430                     public void begin(long size) {
431                         downstream.begin(calcSize(size, skip, m));
432                     }
433 
434                     @Override
435                     public void accept(long t) {
436                         if (n == 0) {
437                             if (m > 0) {
438                                 m--;
439                                 downstream.accept(t);
440                             }
441                         }
442                         else {
443                             n--;
444                         }
445                     }
446 
447                     @Override
448                     public boolean cancellationRequested() {
449                         return m == 0 || downstream.cancellationRequested();
450                     }
451                 };
452             }
453         };
454     }
455 
456     /**
457      * Appends a "slice" operation to the provided DoubleStream.  The slice
458      * operation may be may be skip-only, limit-only, or skip-and-limit.
459      *
460      * @param upstream A DoubleStream
461      * @param skip The number of elements to skip.  Must be >= 0.
462      * @param limit The maximum size of the resulting stream, or -1 if no limit
463      *        is to be imposed
464      */
465     public static DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream,
466                                           long skip, long limit) {
467         if (skip < 0)
468             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
469         long normalizedLimit = limit >= 0 ? limit : Long.MAX_VALUE;
470 
471         return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE,
472                                                      flags(limit)) {
473             @Override
474             long exactOutputSize(long previousSize) {
475                 return calcSize(previousSize, skip, normalizedLimit);
476             }
477 
478             Spliterator.OfDouble unorderedSkipLimitSpliterator(
479                     Spliterator.OfDouble s, long skip, long limit, long sizeIfKnown) {
480                 if (skip <= sizeIfKnown) {
481                     // Use just the limit if the number of elements
482                     // to skip is <= the known pipeline size
483                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
484                     skip = 0;
485                 }
486                 return new StreamSpliterators.UnorderedSliceSpliterator.OfDouble(s, skip, limit);
487             }
488 
489             @Override
490             // Android-changed: Make public, to match the method it's overriding.
491             public <P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper,
492                                                               Spliterator<P_IN> spliterator) {
493                 long size = helper.exactOutputSizeIfKnown(spliterator);
494                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
495                     return new StreamSpliterators.SliceSpliterator.OfDouble(
496                             (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
497                             skip,
498                             calcSliceFence(skip, limit));
499                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
500                     return unorderedSkipLimitSpliterator(
501                             (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
502                             skip, limit, size);
503                 }
504                 else {
505                     return new SliceTask<>(this, helper, spliterator, Double[]::new, skip, limit).
506                             invoke().spliterator();
507                 }
508             }
509 
510             @Override
511             // Android-changed: Make public, to match the method it's overriding.
512             public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
513                                                    Spliterator<P_IN> spliterator,
514                                                    IntFunction<Double[]> generator) {
515                 long size = helper.exactOutputSizeIfKnown(spliterator);
516                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
517                     // Because the pipeline is SIZED the slice spliterator
518                     // can be created from the source, this requires matching
519                     // to shape of the source, and is potentially more efficient
520                     // than creating the slice spliterator from the pipeline
521                     // wrapping spliterator
522                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
523                     return Nodes.collectDouble(helper, s, true);
524                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
525                     Spliterator.OfDouble s =  unorderedSkipLimitSpliterator(
526                             (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
527                             skip, limit, size);
528                     // Collect using this pipeline, which is empty and therefore
529                     // can be used with the pipeline wrapping spliterator
530                     // Note that we cannot create a slice spliterator from
531                     // the source spliterator if the pipeline is not SIZED
532                     return Nodes.collectDouble(this, s, true);
533                 }
534                 else {
535                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
536                             invoke();
537                 }
538             }
539 
540             @Override
541             // Android-changed: Make public, to match the method it's overriding.
542             public Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
543                 return new Sink.ChainedDouble<>(sink) {
544                     long n = skip;
545                     long m = normalizedLimit;
546 
547                     @Override
548                     public void begin(long size) {
549                         downstream.begin(calcSize(size, skip, m));
550                     }
551 
552                     @Override
553                     public void accept(double t) {
554                         if (n == 0) {
555                             if (m > 0) {
556                                 m--;
557                                 downstream.accept(t);
558                             }
559                         }
560                         else {
561                             n--;
562                         }
563                     }
564 
565                     @Override
566                     public boolean cancellationRequested() {
567                         return m == 0 || downstream.cancellationRequested();
568                     }
569                 };
570             }
571         };
572     }
573 
574     private static int flags(long limit) {
575         return StreamOpFlag.IS_SIZE_ADJUSTING | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0);
576     }
577 
578     /**
579      * {@code ForkJoinTask} implementing slice computation.
580      *
581      * @param <P_IN> Input element type to the stream pipeline
582      * @param <P_OUT> Output element type from the stream pipeline
583      */
584     @SuppressWarnings("serial")
585     private static final class SliceTask<P_IN, P_OUT>
586             extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, SliceTask<P_IN, P_OUT>> {
587         private final AbstractPipeline<P_OUT, P_OUT, ?> op;
588         private final IntFunction<P_OUT[]> generator;
589         private final long targetOffset, targetSize;
590         private long thisNodeSize;
591 
592         private volatile boolean completed;
593 
594         SliceTask(AbstractPipeline<P_OUT, P_OUT, ?> op,
595                   PipelineHelper<P_OUT> helper,
596                   Spliterator<P_IN> spliterator,
597                   IntFunction<P_OUT[]> generator,
598                   long offset, long size) {
599             super(helper, spliterator);
600             this.op = op;
601             this.generator = generator;
602             this.targetOffset = offset;
603             this.targetSize = size;
604         }
605 
606         SliceTask(SliceTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
607             super(parent, spliterator);
608             this.op = parent.op;
609             this.generator = parent.generator;
610             this.targetOffset = parent.targetOffset;
611             this.targetSize = parent.targetSize;
612         }
613 
614         @Override
615         protected SliceTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
616             return new SliceTask<>(this, spliterator);
617         }
618 
619         @Override
620         protected final Node<P_OUT> getEmptyResult() {
621             return Nodes.emptyNode(op.getOutputShape());
622         }
623 
624         @Override
625         protected final Node<P_OUT> doLeaf() {
626             if (isRoot()) {
627                 long sizeIfKnown = StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags)
628                                    ? op.exactOutputSizeIfKnown(spliterator)
629                                    : -1;
630                 final Node.Builder<P_OUT> nb = op.makeNodeBuilder(sizeIfKnown, generator);
631                 Sink<P_OUT> opSink = op.opWrapSink(helper.getStreamAndOpFlags(), nb);
632                 helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator);
633                 // There is no need to truncate since the op performs the
634                 // skipping and limiting of elements
635                 return nb.build();
636             }
637             else {
638                 final Node.Builder<P_OUT> nb = op.makeNodeBuilder(-1, generator);
639                 if (targetOffset == 0) { // limit only
640                     Sink<P_OUT> opSink = op.opWrapSink(helper.getStreamAndOpFlags(), nb);
641                     helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator);
642                 }
643                 else {
644                     helper.wrapAndCopyInto(nb, spliterator);
645                 }
646                 Node<P_OUT> node = nb.build();
647                 thisNodeSize = node.count();
648                 completed = true;
649                 spliterator = null;
650                 return node;
651             }
652         }
653 
654         @Override
655         public final void onCompletion(CountedCompleter<?> caller) {
656             if (!isLeaf()) {
657                 Node<P_OUT> result;
658                 thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize;
659                 if (canceled) {
660                     thisNodeSize = 0;
661                     result = getEmptyResult();
662                 }
663                 else if (thisNodeSize == 0)
664                     result = getEmptyResult();
665                 else if (leftChild.thisNodeSize == 0)
666                     result = rightChild.getLocalResult();
667                 else {
668                     result = Nodes.conc(op.getOutputShape(),
669                                         leftChild.getLocalResult(), rightChild.getLocalResult());
670                 }
671                 setLocalResult(isRoot() ? doTruncate(result) : result);
672                 completed = true;
673             }
674             if (targetSize >= 0
675                 && !isRoot()
676                 && isLeftCompleted(targetOffset + targetSize))
677                     cancelLaterNodes();
678 
679             super.onCompletion(caller);
680         }
681 
682         @Override
683         protected void cancel() {
684             super.cancel();
685             if (completed)
686                 setLocalResult(getEmptyResult());
687         }
688 
689         private Node<P_OUT> doTruncate(Node<P_OUT> input) {
690             long to = targetSize >= 0 ? Math.min(input.count(), targetOffset + targetSize) : thisNodeSize;
691             return input.truncate(targetOffset, to, generator);
692         }
693 
694         /**
695          * Determine if the number of completed elements in this node and nodes
696          * to the left of this node is greater than or equal to the target size.
697          *
698          * @param target the target size
699          * @return true if the number of elements is greater than or equal to
700          *         the target size, otherwise false.
701          */
702         private boolean isLeftCompleted(long target) {
703             long size = completed ? thisNodeSize : completedSize(target);
704             if (size >= target)
705                 return true;
706             for (SliceTask<P_IN, P_OUT> parent = getParent(), node = this;
707                  parent != null;
708                  node = parent, parent = parent.getParent()) {
709                 if (node == parent.rightChild) {
710                     SliceTask<P_IN, P_OUT> left = parent.leftChild;
711                     if (left != null) {
712                         size += left.completedSize(target);
713                         if (size >= target)
714                             return true;
715                     }
716                 }
717             }
718             return size >= target;
719         }
720 
721         /**
722          * Compute the number of completed elements in this node.
723          * <p>
724          * Computation terminates if all nodes have been processed or the
725          * number of completed elements is greater than or equal to the target
726          * size.
727          *
728          * @param target the target size
729          * @return the number of completed elements
730          */
731         private long completedSize(long target) {
732             if (completed)
733                 return thisNodeSize;
734             else {
735                 SliceTask<P_IN, P_OUT> left = leftChild;
736                 SliceTask<P_IN, P_OUT> right = rightChild;
737                 if (left == null || right == null) {
738                     // must be completed
739                     return thisNodeSize;
740                 }
741                 else {
742                     long leftSize = left.completedSize(target);
743                     return (leftSize >= target) ? leftSize : leftSize + right.completedSize(target);
744                 }
745             }
746         }
747     }
748 }
749