1 /*
2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.Spliterator;
28 import java.util.concurrent.CountedCompleter;
29 import java.util.function.IntFunction;
30 
31 /**
32  * Factory for instances of a short-circuiting stateful intermediate operations
33  * that produce subsequences of their input stream.
34  *
35  * @since 1.8
36  */
37 final class SliceOps {
38 
39     // No instances
SliceOps()40     private SliceOps() { }
41 
42     /**
43      * Calculates the sliced size given the current size, number of elements
44      * skip, and the number of elements to limit.
45      *
46      * @param size the current size
47      * @param skip the number of elements to skip, assumed to be >= 0
48      * @param limit the number of elements to limit, assumed to be >= 0, with
49      *        a value of {@code Long.MAX_VALUE} if there is no limit
50      * @return the sliced size
51      */
calcSize(long size, long skip, long limit)52     private static long calcSize(long size, long skip, long limit) {
53         return size >= 0 ? Math.max(-1, Math.min(size - skip, limit)) : -1;
54     }
55 
56     /**
57      * Calculates the slice fence, which is one past the index of the slice
58      * range
59      * @param skip the number of elements to skip, assumed to be >= 0
60      * @param limit the number of elements to limit, assumed to be >= 0, with
61      *        a value of {@code Long.MAX_VALUE} if there is no limit
62      * @return the slice fence.
63      */
calcSliceFence(long skip, long limit)64     private static long calcSliceFence(long skip, long limit) {
65         long sliceFence = limit >= 0 ? skip + limit : Long.MAX_VALUE;
66         // Check for overflow
67         return (sliceFence >= 0) ? sliceFence : Long.MAX_VALUE;
68     }
69 
70     /**
71      * Creates a slice spliterator given a stream shape governing the
72      * spliterator type.  Requires that the underlying Spliterator
73      * be SUBSIZED.
74      */
75     @SuppressWarnings("unchecked")
sliceSpliterator(StreamShape shape, Spliterator<P_IN> s, long skip, long limit)76     private static <P_IN> Spliterator<P_IN> sliceSpliterator(StreamShape shape,
77                                                              Spliterator<P_IN> s,
78                                                              long skip, long limit) {
79         assert s.hasCharacteristics(Spliterator.SUBSIZED);
80         long sliceFence = calcSliceFence(skip, limit);
81         switch (shape) {
82             case REFERENCE:
83                 return new StreamSpliterators
84                         .SliceSpliterator.OfRef<>(s, skip, sliceFence);
85             case INT_VALUE:
86                 return (Spliterator<P_IN>) new StreamSpliterators
87                         .SliceSpliterator.OfInt((Spliterator.OfInt) s, skip, sliceFence);
88             case LONG_VALUE:
89                 return (Spliterator<P_IN>) new StreamSpliterators
90                         .SliceSpliterator.OfLong((Spliterator.OfLong) s, skip, sliceFence);
91             case DOUBLE_VALUE:
92                 return (Spliterator<P_IN>) new StreamSpliterators
93                         .SliceSpliterator.OfDouble((Spliterator.OfDouble) s, skip, sliceFence);
94             default:
95                 throw new IllegalStateException("Unknown shape " + shape);
96         }
97     }
98 
99     @SuppressWarnings("unchecked")
castingArray()100     private static <T> IntFunction<T[]> castingArray() {
101         return size -> (T[]) new Object[size];
102     }
103 
104     /**
105      * Appends a "slice" operation to the provided stream.  The slice operation
106      * may be may be skip-only, limit-only, or skip-and-limit.
107      *
108      * @param <T> the type of both input and output elements
109      * @param upstream a reference stream with element type T
110      * @param skip the number of elements to skip.  Must be >= 0.
111      * @param limit the maximum size of the resulting stream, or -1 if no limit
112      *        is to be imposed
113      */
makeRef(AbstractPipeline<?, T, ?> upstream, long skip, long limit)114     public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
115                                         long skip, long limit) {
116         if (skip < 0)
117             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
118 
119         return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
120                                                       flags(limit)) {
121             Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s,
122                                                          long skip, long limit, long sizeIfKnown) {
123                 if (skip <= sizeIfKnown) {
124                     // Use just the limit if the number of elements
125                     // to skip is <= the known pipeline size
126                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
127                     skip = 0;
128                 }
129                 return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit);
130             }
131 
132             @Override
133             public <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
134                 long size = helper.exactOutputSizeIfKnown(spliterator);
135                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
136                     return new StreamSpliterators.SliceSpliterator.OfRef<>(
137                             helper.wrapSpliterator(spliterator),
138                             skip,
139                             calcSliceFence(skip, limit));
140                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
141                     return unorderedSkipLimitSpliterator(
142                             helper.wrapSpliterator(spliterator),
143                             skip, limit, size);
144                 }
145                 else {
146                     // @@@ OOMEs will occur for LongStream.longs().filter(i -> true).limit(n)
147                     //     regardless of the value of n
148                     //     Need to adjust the target size of splitting for the
149                     //     SliceTask from say (size / k) to say min(size / k, 1 << 14)
150                     //     This will limit the size of the buffers created at the leaf nodes
151                     //     cancellation will be more aggressive cancelling later tasks
152                     //     if the target slice size has been reached from a given task,
153                     //     cancellation should also clear local results if any
154                     return new SliceTask<>(this, helper, spliterator, castingArray(), skip, limit).
155                             invoke().spliterator();
156                 }
157             }
158 
159             @Override
160             public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
161                                               Spliterator<P_IN> spliterator,
162                                               IntFunction<T[]> generator) {
163                 long size = helper.exactOutputSizeIfKnown(spliterator);
164                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
165                     // Because the pipeline is SIZED the slice spliterator
166                     // can be created from the source, this requires matching
167                     // to shape of the source, and is potentially more efficient
168                     // than creating the slice spliterator from the pipeline
169                     // wrapping spliterator
170                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
171                     return Nodes.collect(helper, s, true, generator);
172                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
173                     Spliterator<T> s =  unorderedSkipLimitSpliterator(
174                             helper.wrapSpliterator(spliterator),
175                             skip, limit, size);
176                     // Collect using this pipeline, which is empty and therefore
177                     // can be used with the pipeline wrapping spliterator
178                     // Note that we cannot create a slice spliterator from
179                     // the source spliterator if the pipeline is not SIZED
180                     return Nodes.collect(this, s, true, generator);
181                 }
182                 else {
183                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
184                             invoke();
185                 }
186             }
187 
188             @Override
189             public Sink<T> opWrapSink(int flags, Sink<T> sink) {
190                 return new Sink.ChainedReference<T, T>(sink) {
191                     long n = skip;
192                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
193 
194                     @Override
195                     public void begin(long size) {
196                         downstream.begin(calcSize(size, skip, m));
197                     }
198 
199                     @Override
200                     public void accept(T t) {
201                         if (n == 0) {
202                             if (m > 0) {
203                                 m--;
204                                 downstream.accept(t);
205                             }
206                         }
207                         else {
208                             n--;
209                         }
210                     }
211 
212                     @Override
213                     public boolean cancellationRequested() {
214                         return m == 0 || downstream.cancellationRequested();
215                     }
216                 };
217             }
218         };
219     }
220 
221     /**
222      * Appends a "slice" operation to the provided IntStream.  The slice
223      * operation may be may be skip-only, limit-only, or skip-and-limit.
224      *
225      * @param upstream An IntStream
226      * @param skip The number of elements to skip.  Must be >= 0.
227      * @param limit The maximum size of the resulting stream, or -1 if no limit
228      *        is to be imposed
229      */
230     public static IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream,
231                                     long skip, long limit) {
232         if (skip < 0)
233             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
234 
235         return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE,
236                                                    flags(limit)) {
237             Spliterator.OfInt unorderedSkipLimitSpliterator(
238                     Spliterator.OfInt s, long skip, long limit, long sizeIfKnown) {
239                 if (skip <= sizeIfKnown) {
240                     // Use just the limit if the number of elements
241                     // to skip is <= the known pipeline size
242                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
243                     skip = 0;
244                 }
245                 return new StreamSpliterators.UnorderedSliceSpliterator.OfInt(s, skip, limit);
246             }
247 
248             @Override
249             public <P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper,
250                                                                Spliterator<P_IN> spliterator) {
251                 long size = helper.exactOutputSizeIfKnown(spliterator);
252                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
253                     return new StreamSpliterators.SliceSpliterator.OfInt(
254                             (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
255                             skip,
256                             calcSliceFence(skip, limit));
257                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
258                     return unorderedSkipLimitSpliterator(
259                             (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
260                             skip, limit, size);
261                 }
262                 else {
263                     return new SliceTask<>(this, helper, spliterator, Integer[]::new, skip, limit).
264                             invoke().spliterator();
265                 }
266             }
267 
268             @Override
269             public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
270                                                     Spliterator<P_IN> spliterator,
271                                                     IntFunction<Integer[]> generator) {
272                 long size = helper.exactOutputSizeIfKnown(spliterator);
273                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
274                     // Because the pipeline is SIZED the slice spliterator
275                     // can be created from the source, this requires matching
276                     // to shape of the source, and is potentially more efficient
277                     // than creating the slice spliterator from the pipeline
278                     // wrapping spliterator
279                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
280                     return Nodes.collectInt(helper, s, true);
281                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
282                     Spliterator.OfInt s =  unorderedSkipLimitSpliterator(
283                             (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
284                             skip, limit, size);
285                     // Collect using this pipeline, which is empty and therefore
286                     // can be used with the pipeline wrapping spliterator
287                     // Note that we cannot create a slice spliterator from
288                     // the source spliterator if the pipeline is not SIZED
289                     return Nodes.collectInt(this, s, true);
290                 }
291                 else {
292                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
293                             invoke();
294                 }
295             }
296 
297             @Override
298             public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
299                 return new Sink.ChainedInt<Integer>(sink) {
300                     long n = skip;
301                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
302 
303                     @Override
304                     public void begin(long size) {
305                         downstream.begin(calcSize(size, skip, m));
306                     }
307 
308                     @Override
309                     public void accept(int t) {
310                         if (n == 0) {
311                             if (m > 0) {
312                                 m--;
313                                 downstream.accept(t);
314                             }
315                         }
316                         else {
317                             n--;
318                         }
319                     }
320 
321                     @Override
322                     public boolean cancellationRequested() {
323                         return m == 0 || downstream.cancellationRequested();
324                     }
325                 };
326             }
327         };
328     }
329 
330     /**
331      * Appends a "slice" operation to the provided LongStream.  The slice
332      * operation may be may be skip-only, limit-only, or skip-and-limit.
333      *
334      * @param upstream A LongStream
335      * @param skip The number of elements to skip.  Must be >= 0.
336      * @param limit The maximum size of the resulting stream, or -1 if no limit
337      *        is to be imposed
338      */
339     public static LongStream makeLong(AbstractPipeline<?, Long, ?> upstream,
340                                       long skip, long limit) {
341         if (skip < 0)
342             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
343 
344         return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE,
345                                                  flags(limit)) {
346             Spliterator.OfLong unorderedSkipLimitSpliterator(
347                     Spliterator.OfLong s, long skip, long limit, long sizeIfKnown) {
348                 if (skip <= sizeIfKnown) {
349                     // Use just the limit if the number of elements
350                     // to skip is <= the known pipeline size
351                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
352                     skip = 0;
353                 }
354                 return new StreamSpliterators.UnorderedSliceSpliterator.OfLong(s, skip, limit);
355             }
356 
357             @Override
358             public <P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper,
359                                                             Spliterator<P_IN> spliterator) {
360                 long size = helper.exactOutputSizeIfKnown(spliterator);
361                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
362                     return new StreamSpliterators.SliceSpliterator.OfLong(
363                             (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
364                             skip,
365                             calcSliceFence(skip, limit));
366                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
367                     return unorderedSkipLimitSpliterator(
368                             (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
369                             skip, limit, size);
370                 }
371                 else {
372                     return new SliceTask<>(this, helper, spliterator, Long[]::new, skip, limit).
373                             invoke().spliterator();
374                 }
375             }
376 
377             @Override
378             public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
379                                                  Spliterator<P_IN> spliterator,
380                                                  IntFunction<Long[]> generator) {
381                 long size = helper.exactOutputSizeIfKnown(spliterator);
382                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
383                     // Because the pipeline is SIZED the slice spliterator
384                     // can be created from the source, this requires matching
385                     // to shape of the source, and is potentially more efficient
386                     // than creating the slice spliterator from the pipeline
387                     // wrapping spliterator
388                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
389                     return Nodes.collectLong(helper, s, true);
390                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
391                     Spliterator.OfLong s =  unorderedSkipLimitSpliterator(
392                             (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
393                             skip, limit, size);
394                     // Collect using this pipeline, which is empty and therefore
395                     // can be used with the pipeline wrapping spliterator
396                     // Note that we cannot create a slice spliterator from
397                     // the source spliterator if the pipeline is not SIZED
398                     return Nodes.collectLong(this, s, true);
399                 }
400                 else {
401                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
402                             invoke();
403                 }
404             }
405 
406             @Override
407             public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
408                 return new Sink.ChainedLong<Long>(sink) {
409                     long n = skip;
410                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
411 
412                     @Override
413                     public void begin(long size) {
414                         downstream.begin(calcSize(size, skip, m));
415                     }
416 
417                     @Override
418                     public void accept(long t) {
419                         if (n == 0) {
420                             if (m > 0) {
421                                 m--;
422                                 downstream.accept(t);
423                             }
424                         }
425                         else {
426                             n--;
427                         }
428                     }
429 
430                     @Override
431                     public boolean cancellationRequested() {
432                         return m == 0 || downstream.cancellationRequested();
433                     }
434                 };
435             }
436         };
437     }
438 
439     /**
440      * Appends a "slice" operation to the provided DoubleStream.  The slice
441      * operation may be may be skip-only, limit-only, or skip-and-limit.
442      *
443      * @param upstream A DoubleStream
444      * @param skip The number of elements to skip.  Must be >= 0.
445      * @param limit The maximum size of the resulting stream, or -1 if no limit
446      *        is to be imposed
447      */
448     public static DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream,
449                                           long skip, long limit) {
450         if (skip < 0)
451             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
452 
453         return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE,
454                                                      flags(limit)) {
455             Spliterator.OfDouble unorderedSkipLimitSpliterator(
456                     Spliterator.OfDouble s, long skip, long limit, long sizeIfKnown) {
457                 if (skip <= sizeIfKnown) {
458                     // Use just the limit if the number of elements
459                     // to skip is <= the known pipeline size
460                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
461                     skip = 0;
462                 }
463                 return new StreamSpliterators.UnorderedSliceSpliterator.OfDouble(s, skip, limit);
464             }
465 
466             @Override
467             public <P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper,
468                                                               Spliterator<P_IN> spliterator) {
469                 long size = helper.exactOutputSizeIfKnown(spliterator);
470                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
471                     return new StreamSpliterators.SliceSpliterator.OfDouble(
472                             (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
473                             skip,
474                             calcSliceFence(skip, limit));
475                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
476                     return unorderedSkipLimitSpliterator(
477                             (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
478                             skip, limit, size);
479                 }
480                 else {
481                     return new SliceTask<>(this, helper, spliterator, Double[]::new, skip, limit).
482                             invoke().spliterator();
483                 }
484             }
485 
486             @Override
487             public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
488                                                    Spliterator<P_IN> spliterator,
489                                                    IntFunction<Double[]> generator) {
490                 long size = helper.exactOutputSizeIfKnown(spliterator);
491                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
492                     // Because the pipeline is SIZED the slice spliterator
493                     // can be created from the source, this requires matching
494                     // to shape of the source, and is potentially more efficient
495                     // than creating the slice spliterator from the pipeline
496                     // wrapping spliterator
497                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
498                     return Nodes.collectDouble(helper, s, true);
499                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
500                     Spliterator.OfDouble s =  unorderedSkipLimitSpliterator(
501                             (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
502                             skip, limit, size);
503                     // Collect using this pipeline, which is empty and therefore
504                     // can be used with the pipeline wrapping spliterator
505                     // Note that we cannot create a slice spliterator from
506                     // the source spliterator if the pipeline is not SIZED
507                     return Nodes.collectDouble(this, s, true);
508                 }
509                 else {
510                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
511                             invoke();
512                 }
513             }
514 
515             @Override
516             public Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
517                 return new Sink.ChainedDouble<Double>(sink) {
518                     long n = skip;
519                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
520 
521                     @Override
522                     public void begin(long size) {
523                         downstream.begin(calcSize(size, skip, m));
524                     }
525 
526                     @Override
527                     public void accept(double t) {
528                         if (n == 0) {
529                             if (m > 0) {
530                                 m--;
531                                 downstream.accept(t);
532                             }
533                         }
534                         else {
535                             n--;
536                         }
537                     }
538 
539                     @Override
540                     public boolean cancellationRequested() {
541                         return m == 0 || downstream.cancellationRequested();
542                     }
543                 };
544             }
545         };
546     }
547 
548     private static int flags(long limit) {
549         return StreamOpFlag.NOT_SIZED | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0);
550     }
551 
552     /**
553      * {@code ForkJoinTask} implementing slice computation.
554      *
555      * @param <P_IN> Input element type to the stream pipeline
556      * @param <P_OUT> Output element type from the stream pipeline
557      */
558     @SuppressWarnings("serial")
559     private static final class SliceTask<P_IN, P_OUT>
560             extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, SliceTask<P_IN, P_OUT>> {
561         private final AbstractPipeline<P_OUT, P_OUT, ?> op;
562         private final IntFunction<P_OUT[]> generator;
563         private final long targetOffset, targetSize;
564         private long thisNodeSize;
565 
566         private volatile boolean completed;
567 
568         SliceTask(AbstractPipeline<P_OUT, P_OUT, ?> op,
569                   PipelineHelper<P_OUT> helper,
570                   Spliterator<P_IN> spliterator,
571                   IntFunction<P_OUT[]> generator,
572                   long offset, long size) {
573             super(helper, spliterator);
574             this.op = op;
575             this.generator = generator;
576             this.targetOffset = offset;
577             this.targetSize = size;
578         }
579 
580         SliceTask(SliceTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
581             super(parent, spliterator);
582             this.op = parent.op;
583             this.generator = parent.generator;
584             this.targetOffset = parent.targetOffset;
585             this.targetSize = parent.targetSize;
586         }
587 
588         @Override
589         protected SliceTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
590             return new SliceTask<>(this, spliterator);
591         }
592 
593         @Override
594         protected final Node<P_OUT> getEmptyResult() {
595             return Nodes.emptyNode(op.getOutputShape());
596         }
597 
598         @Override
599         protected final Node<P_OUT> doLeaf() {
600             if (isRoot()) {
601                 long sizeIfKnown = StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags)
602                                    ? op.exactOutputSizeIfKnown(spliterator)
603                                    : -1;
604                 final Node.Builder<P_OUT> nb = op.makeNodeBuilder(sizeIfKnown, generator);
605                 Sink<P_OUT> opSink = op.opWrapSink(helper.getStreamAndOpFlags(), nb);
606                 helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator);
607                 // There is no need to truncate since the op performs the
608                 // skipping and limiting of elements
609                 return nb.build();
610             }
611             else {
612                 Node<P_OUT> node = helper.wrapAndCopyInto(helper.makeNodeBuilder(-1, generator),
613                                                           spliterator).build();
614                 thisNodeSize = node.count();
615                 completed = true;
616                 spliterator = null;
617                 return node;
618             }
619         }
620 
621         @Override
622         public final void onCompletion(CountedCompleter<?> caller) {
623             if (!isLeaf()) {
624                 Node<P_OUT> result;
625                 thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize;
626                 if (canceled) {
627                     thisNodeSize = 0;
628                     result = getEmptyResult();
629                 }
630                 else if (thisNodeSize == 0)
631                     result = getEmptyResult();
632                 else if (leftChild.thisNodeSize == 0)
633                     result = rightChild.getLocalResult();
634                 else {
635                     result = Nodes.conc(op.getOutputShape(),
636                                         leftChild.getLocalResult(), rightChild.getLocalResult());
637                 }
638                 setLocalResult(isRoot() ? doTruncate(result) : result);
639                 completed = true;
640             }
641             if (targetSize >= 0
642                 && !isRoot()
643                 && isLeftCompleted(targetOffset + targetSize))
644                     cancelLaterNodes();
645 
646             super.onCompletion(caller);
647         }
648 
649         @Override
650         protected void cancel() {
651             super.cancel();
652             if (completed)
653                 setLocalResult(getEmptyResult());
654         }
655 
656         private Node<P_OUT> doTruncate(Node<P_OUT> input) {
657             long to = targetSize >= 0 ? Math.min(input.count(), targetOffset + targetSize) : thisNodeSize;
658             return input.truncate(targetOffset, to, generator);
659         }
660 
661         /**
662          * Determine if the number of completed elements in this node and nodes
663          * to the left of this node is greater than or equal to the target size.
664          *
665          * @param target the target size
666          * @return true if the number of elements is greater than or equal to
667          *         the target size, otherwise false.
668          */
669         private boolean isLeftCompleted(long target) {
670             long size = completed ? thisNodeSize : completedSize(target);
671             if (size >= target)
672                 return true;
673             for (SliceTask<P_IN, P_OUT> parent = getParent(), node = this;
674                  parent != null;
675                  node = parent, parent = parent.getParent()) {
676                 if (node == parent.rightChild) {
677                     SliceTask<P_IN, P_OUT> left = parent.leftChild;
678                     if (left != null) {
679                         size += left.completedSize(target);
680                         if (size >= target)
681                             return true;
682                     }
683                 }
684             }
685             return size >= target;
686         }
687 
688         /**
689          * Compute the number of completed elements in this node.
690          * <p>
691          * Computation terminates if all nodes have been processed or the
692          * number of completed elements is greater than or equal to the target
693          * size.
694          *
695          * @param target the target size
696          * @return return the number of completed elements
697          */
698         private long completedSize(long target) {
699             if (completed)
700                 return thisNodeSize;
701             else {
702                 SliceTask<P_IN, P_OUT> left = leftChild;
703                 SliceTask<P_IN, P_OUT> right = rightChild;
704                 if (left == null || right == null) {
705                     // must be completed
706                     return thisNodeSize;
707                 }
708                 else {
709                     long leftSize = left.completedSize(target);
710                     return (leftSize >= target) ? leftSize : leftSize + right.completedSize(target);
711                 }
712             }
713         }
714     }
715 }
716