1 /*
2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.Comparator;
28 import java.util.Objects;
29 import java.util.Spliterator;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.atomic.AtomicLong;
32 import java.util.function.BooleanSupplier;
33 import java.util.function.Consumer;
34 import java.util.function.DoubleConsumer;
35 import java.util.function.DoubleSupplier;
36 import java.util.function.IntConsumer;
37 import java.util.function.IntSupplier;
38 import java.util.function.LongConsumer;
39 import java.util.function.LongSupplier;
40 import java.util.function.Supplier;
41 
42 /**
43  * Spliterator implementations for wrapping and delegating spliterators, used
44  * in the implementation of the {@link Stream#spliterator()} method.
45  *
46  * @since 1.8
47  */
48 class StreamSpliterators {
49 
50     /**
51      * Abstract wrapping spliterator that binds to the spliterator of a
52      * pipeline helper on first operation.
53      *
54      * <p>This spliterator is not late-binding and will bind to the source
55      * spliterator when first operated on.
56      *
57      * <p>A wrapping spliterator produced from a sequential stream
58      * cannot be split if there are stateful operations present.
59      */
60     private static abstract class AbstractWrappingSpliterator<P_IN, P_OUT,
61                                                               T_BUFFER extends AbstractSpinedBuffer>
62             implements Spliterator<P_OUT> {
63 
64         // @@@ Detect if stateful operations are present or not
65         //     If not then can split otherwise cannot
66 
67         /**
68          * True if this spliterator supports splitting
69          */
70         final boolean isParallel;
71 
72         final PipelineHelper<P_OUT> ph;
73 
74         /**
75          * Supplier for the source spliterator.  Client provides either a
76          * spliterator or a supplier.
77          */
78         private Supplier<Spliterator<P_IN>> spliteratorSupplier;
79 
80         /**
81          * Source spliterator.  Either provided from client or obtained from
82          * supplier.
83          */
84         Spliterator<P_IN> spliterator;
85 
86         /**
87          * Sink chain for the downstream stages of the pipeline, ultimately
88          * leading to the buffer. Used during partial traversal.
89          */
90         Sink<P_IN> bufferSink;
91 
92         /**
93          * A function that advances one element of the spliterator, pushing
94          * it to bufferSink.  Returns whether any elements were processed.
95          * Used during partial traversal.
96          */
97         BooleanSupplier pusher;
98 
99         /** Next element to consume from the buffer, used during partial traversal */
100         long nextToConsume;
101 
102         /** Buffer into which elements are pushed.  Used during partial traversal. */
103         T_BUFFER buffer;
104 
105         /**
106          * True if full traversal has occurred (with possible cancelation).
107          * If doing a partial traversal, there may be still elements in buffer.
108          */
109         boolean finished;
110 
111         /**
112          * Construct an AbstractWrappingSpliterator from a
113          * {@code Supplier<Spliterator>}.
114          */
AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, Supplier<Spliterator<P_IN>> spliteratorSupplier, boolean parallel)115         AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph,
116                                     Supplier<Spliterator<P_IN>> spliteratorSupplier,
117                                     boolean parallel) {
118             this.ph = ph;
119             this.spliteratorSupplier = spliteratorSupplier;
120             this.spliterator = null;
121             this.isParallel = parallel;
122         }
123 
124         /**
125          * Construct an AbstractWrappingSpliterator from a
126          * {@code Spliterator}.
127          */
AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, Spliterator<P_IN> spliterator, boolean parallel)128         AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph,
129                                     Spliterator<P_IN> spliterator,
130                                     boolean parallel) {
131             this.ph = ph;
132             this.spliteratorSupplier = null;
133             this.spliterator = spliterator;
134             this.isParallel = parallel;
135         }
136 
137         /**
138          * Called before advancing to set up spliterator, if needed.
139          */
init()140         final void init() {
141             if (spliterator == null) {
142                 spliterator = spliteratorSupplier.get();
143                 spliteratorSupplier = null;
144             }
145         }
146 
147         /**
148          * Get an element from the source, pushing it into the sink chain,
149          * setting up the buffer if needed
150          * @return whether there are elements to consume from the buffer
151          */
doAdvance()152         final boolean doAdvance() {
153             if (buffer == null) {
154                 if (finished)
155                     return false;
156 
157                 init();
158                 initPartialTraversalState();
159                 nextToConsume = 0;
160                 bufferSink.begin(spliterator.getExactSizeIfKnown());
161                 return fillBuffer();
162             }
163             else {
164                 ++nextToConsume;
165                 boolean hasNext = nextToConsume < buffer.count();
166                 if (!hasNext) {
167                     nextToConsume = 0;
168                     buffer.clear();
169                     hasNext = fillBuffer();
170                 }
171                 return hasNext;
172             }
173         }
174 
175         /**
176          * Invokes the shape-specific constructor with the provided arguments
177          * and returns the result.
178          */
179         abstract AbstractWrappingSpliterator<P_IN, P_OUT, ?> wrap(Spliterator<P_IN> s);
180 
181         /**
182          * Initializes buffer, sink chain, and pusher for a shape-specific
183          * implementation.
184          */
185         abstract void initPartialTraversalState();
186 
187         @Override
188         public Spliterator<P_OUT> trySplit() {
189             if (isParallel && !finished) {
190                 init();
191 
192                 Spliterator<P_IN> split = spliterator.trySplit();
193                 return (split == null) ? null : wrap(split);
194             }
195             else
196                 return null;
197         }
198 
199         /**
200          * If the buffer is empty, push elements into the sink chain until
201          * the source is empty or cancellation is requested.
202          * @return whether there are elements to consume from the buffer
203          */
204         private boolean fillBuffer() {
205             while (buffer.count() == 0) {
206                 if (bufferSink.cancellationRequested() || !pusher.getAsBoolean()) {
207                     if (finished)
208                         return false;
209                     else {
210                         bufferSink.end(); // might trigger more elements
211                         finished = true;
212                     }
213                 }
214             }
215             return true;
216         }
217 
218         @Override
219         public final long estimateSize() {
220             init();
221             // Use the estimate of the wrapped spliterator
222             // Note this may not be accurate if there are filter/flatMap
223             // operations filtering or adding elements to the stream
224             return spliterator.estimateSize();
225         }
226 
227         @Override
228         public final long getExactSizeIfKnown() {
229             init();
230             return StreamOpFlag.SIZED.isKnown(ph.getStreamAndOpFlags())
231                    ? spliterator.getExactSizeIfKnown()
232                    : -1;
233         }
234 
235         @Override
236         public final int characteristics() {
237             init();
238 
239             // Get the characteristics from the pipeline
240             int c = StreamOpFlag.toCharacteristics(StreamOpFlag.toStreamFlags(ph.getStreamAndOpFlags()));
241 
242             // Mask off the size and uniform characteristics and replace with
243             // those of the spliterator
244             // Note that a non-uniform spliterator can change from something
245             // with an exact size to an estimate for a sub-split, for example
246             // with HashSet where the size is known at the top level spliterator
247             // but for sub-splits only an estimate is known
248             if ((c & Spliterator.SIZED) != 0) {
249                 c &= ~(Spliterator.SIZED | Spliterator.SUBSIZED);
250                 c |= (spliterator.characteristics() & (Spliterator.SIZED | Spliterator.SUBSIZED));
251             }
252 
253             return c;
254         }
255 
256         @Override
257         public Comparator<? super P_OUT> getComparator() {
258             if (!hasCharacteristics(SORTED))
259                 throw new IllegalStateException();
260             return null;
261         }
262 
263         @Override
264         public final String toString() {
265             return String.format("%s[%s]", getClass().getName(), spliterator);
266         }
267     }
268 
269     static final class WrappingSpliterator<P_IN, P_OUT>
270             extends AbstractWrappingSpliterator<P_IN, P_OUT, SpinedBuffer<P_OUT>> {
271 
272         WrappingSpliterator(PipelineHelper<P_OUT> ph,
273                             Supplier<Spliterator<P_IN>> supplier,
274                             boolean parallel) {
275             super(ph, supplier, parallel);
276         }
277 
278         WrappingSpliterator(PipelineHelper<P_OUT> ph,
279                             Spliterator<P_IN> spliterator,
280                             boolean parallel) {
281             super(ph, spliterator, parallel);
282         }
283 
284         @Override
285         WrappingSpliterator<P_IN, P_OUT> wrap(Spliterator<P_IN> s) {
286             return new WrappingSpliterator<>(ph, s, isParallel);
287         }
288 
289         @Override
290         void initPartialTraversalState() {
291             SpinedBuffer<P_OUT> b = new SpinedBuffer<>();
292             buffer = b;
293             bufferSink = ph.wrapSink(b::accept);
294             pusher = () -> spliterator.tryAdvance(bufferSink);
295         }
296 
297         @Override
tryAdvance(Consumer<? super P_OUT> consumer)298         public boolean tryAdvance(Consumer<? super P_OUT> consumer) {
299             Objects.requireNonNull(consumer);
300             boolean hasNext = doAdvance();
301             if (hasNext)
302                 consumer.accept(buffer.get(nextToConsume));
303             return hasNext;
304         }
305 
306         @Override
forEachRemaining(Consumer<? super P_OUT> consumer)307         public void forEachRemaining(Consumer<? super P_OUT> consumer) {
308             if (buffer == null && !finished) {
309                 Objects.requireNonNull(consumer);
310                 init();
311 
312                 ph.wrapAndCopyInto((Sink<P_OUT>) consumer::accept, spliterator);
313                 finished = true;
314             }
315             else {
316                 do { } while (tryAdvance(consumer));
317             }
318         }
319     }
320 
321     static final class IntWrappingSpliterator<P_IN>
322             extends AbstractWrappingSpliterator<P_IN, Integer, SpinedBuffer.OfInt>
323             implements Spliterator.OfInt {
324 
IntWrappingSpliterator(PipelineHelper<Integer> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)325         IntWrappingSpliterator(PipelineHelper<Integer> ph,
326                                Supplier<Spliterator<P_IN>> supplier,
327                                boolean parallel) {
328             super(ph, supplier, parallel);
329         }
330 
IntWrappingSpliterator(PipelineHelper<Integer> ph, Spliterator<P_IN> spliterator, boolean parallel)331         IntWrappingSpliterator(PipelineHelper<Integer> ph,
332                                Spliterator<P_IN> spliterator,
333                                boolean parallel) {
334             super(ph, spliterator, parallel);
335         }
336 
337         @Override
wrap(Spliterator<P_IN> s)338         AbstractWrappingSpliterator<P_IN, Integer, ?> wrap(Spliterator<P_IN> s) {
339             return new IntWrappingSpliterator<>(ph, s, isParallel);
340         }
341 
342         @Override
initPartialTraversalState()343         void initPartialTraversalState() {
344             SpinedBuffer.OfInt b = new SpinedBuffer.OfInt();
345             buffer = b;
346             bufferSink = ph.wrapSink((Sink.OfInt) b::accept);
347             pusher = () -> spliterator.tryAdvance(bufferSink);
348         }
349 
350         @Override
trySplit()351         public Spliterator.OfInt trySplit() {
352             return (Spliterator.OfInt) super.trySplit();
353         }
354 
355         @Override
tryAdvance(IntConsumer consumer)356         public boolean tryAdvance(IntConsumer consumer) {
357             Objects.requireNonNull(consumer);
358             boolean hasNext = doAdvance();
359             if (hasNext)
360                 consumer.accept(buffer.get(nextToConsume));
361             return hasNext;
362         }
363 
364         @Override
forEachRemaining(IntConsumer consumer)365         public void forEachRemaining(IntConsumer consumer) {
366             if (buffer == null && !finished) {
367                 Objects.requireNonNull(consumer);
368                 init();
369 
370                 ph.wrapAndCopyInto((Sink.OfInt) consumer::accept, spliterator);
371                 finished = true;
372             }
373             else {
374                 do { } while (tryAdvance(consumer));
375             }
376         }
377     }
378 
379     static final class LongWrappingSpliterator<P_IN>
380             extends AbstractWrappingSpliterator<P_IN, Long, SpinedBuffer.OfLong>
381             implements Spliterator.OfLong {
382 
LongWrappingSpliterator(PipelineHelper<Long> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)383         LongWrappingSpliterator(PipelineHelper<Long> ph,
384                                 Supplier<Spliterator<P_IN>> supplier,
385                                 boolean parallel) {
386             super(ph, supplier, parallel);
387         }
388 
LongWrappingSpliterator(PipelineHelper<Long> ph, Spliterator<P_IN> spliterator, boolean parallel)389         LongWrappingSpliterator(PipelineHelper<Long> ph,
390                                 Spliterator<P_IN> spliterator,
391                                 boolean parallel) {
392             super(ph, spliterator, parallel);
393         }
394 
395         @Override
wrap(Spliterator<P_IN> s)396         AbstractWrappingSpliterator<P_IN, Long, ?> wrap(Spliterator<P_IN> s) {
397             return new LongWrappingSpliterator<>(ph, s, isParallel);
398         }
399 
400         @Override
initPartialTraversalState()401         void initPartialTraversalState() {
402             SpinedBuffer.OfLong b = new SpinedBuffer.OfLong();
403             buffer = b;
404             bufferSink = ph.wrapSink((Sink.OfLong) b::accept);
405             pusher = () -> spliterator.tryAdvance(bufferSink);
406         }
407 
408         @Override
trySplit()409         public Spliterator.OfLong trySplit() {
410             return (Spliterator.OfLong) super.trySplit();
411         }
412 
413         @Override
tryAdvance(LongConsumer consumer)414         public boolean tryAdvance(LongConsumer consumer) {
415             Objects.requireNonNull(consumer);
416             boolean hasNext = doAdvance();
417             if (hasNext)
418                 consumer.accept(buffer.get(nextToConsume));
419             return hasNext;
420         }
421 
422         @Override
forEachRemaining(LongConsumer consumer)423         public void forEachRemaining(LongConsumer consumer) {
424             if (buffer == null && !finished) {
425                 Objects.requireNonNull(consumer);
426                 init();
427 
428                 ph.wrapAndCopyInto((Sink.OfLong) consumer::accept, spliterator);
429                 finished = true;
430             }
431             else {
432                 do { } while (tryAdvance(consumer));
433             }
434         }
435     }
436 
437     static final class DoubleWrappingSpliterator<P_IN>
438             extends AbstractWrappingSpliterator<P_IN, Double, SpinedBuffer.OfDouble>
439             implements Spliterator.OfDouble {
440 
DoubleWrappingSpliterator(PipelineHelper<Double> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)441         DoubleWrappingSpliterator(PipelineHelper<Double> ph,
442                                   Supplier<Spliterator<P_IN>> supplier,
443                                   boolean parallel) {
444             super(ph, supplier, parallel);
445         }
446 
DoubleWrappingSpliterator(PipelineHelper<Double> ph, Spliterator<P_IN> spliterator, boolean parallel)447         DoubleWrappingSpliterator(PipelineHelper<Double> ph,
448                                   Spliterator<P_IN> spliterator,
449                                   boolean parallel) {
450             super(ph, spliterator, parallel);
451         }
452 
453         @Override
wrap(Spliterator<P_IN> s)454         AbstractWrappingSpliterator<P_IN, Double, ?> wrap(Spliterator<P_IN> s) {
455             return new DoubleWrappingSpliterator<>(ph, s, isParallel);
456         }
457 
458         @Override
initPartialTraversalState()459         void initPartialTraversalState() {
460             SpinedBuffer.OfDouble b = new SpinedBuffer.OfDouble();
461             buffer = b;
462             bufferSink = ph.wrapSink((Sink.OfDouble) b::accept);
463             pusher = () -> spliterator.tryAdvance(bufferSink);
464         }
465 
466         @Override
trySplit()467         public Spliterator.OfDouble trySplit() {
468             return (Spliterator.OfDouble) super.trySplit();
469         }
470 
471         @Override
tryAdvance(DoubleConsumer consumer)472         public boolean tryAdvance(DoubleConsumer consumer) {
473             Objects.requireNonNull(consumer);
474             boolean hasNext = doAdvance();
475             if (hasNext)
476                 consumer.accept(buffer.get(nextToConsume));
477             return hasNext;
478         }
479 
480         @Override
forEachRemaining(DoubleConsumer consumer)481         public void forEachRemaining(DoubleConsumer consumer) {
482             if (buffer == null && !finished) {
483                 Objects.requireNonNull(consumer);
484                 init();
485 
486                 ph.wrapAndCopyInto((Sink.OfDouble) consumer::accept, spliterator);
487                 finished = true;
488             }
489             else {
490                 do { } while (tryAdvance(consumer));
491             }
492         }
493     }
494 
495     /**
496      * Spliterator implementation that delegates to an underlying spliterator,
497      * acquiring the spliterator from a {@code Supplier<Spliterator>} on the
498      * first call to any spliterator method.
499      * @param <T>
500      */
501     static class DelegatingSpliterator<T, T_SPLITR extends Spliterator<T>>
502             implements Spliterator<T> {
503         private final Supplier<? extends T_SPLITR> supplier;
504 
505         private T_SPLITR s;
506 
DelegatingSpliterator(Supplier<? extends T_SPLITR> supplier)507         DelegatingSpliterator(Supplier<? extends T_SPLITR> supplier) {
508             this.supplier = supplier;
509         }
510 
get()511         T_SPLITR get() {
512             if (s == null) {
513                 s = supplier.get();
514             }
515             return s;
516         }
517 
518         @Override
519         @SuppressWarnings("unchecked")
trySplit()520         public T_SPLITR trySplit() {
521             return (T_SPLITR) get().trySplit();
522         }
523 
524         @Override
tryAdvance(Consumer<? super T> consumer)525         public boolean tryAdvance(Consumer<? super T> consumer) {
526             return get().tryAdvance(consumer);
527         }
528 
529         @Override
forEachRemaining(Consumer<? super T> consumer)530         public void forEachRemaining(Consumer<? super T> consumer) {
531             get().forEachRemaining(consumer);
532         }
533 
534         @Override
estimateSize()535         public long estimateSize() {
536             return get().estimateSize();
537         }
538 
539         @Override
characteristics()540         public int characteristics() {
541             return get().characteristics();
542         }
543 
544         @Override
getComparator()545         public Comparator<? super T> getComparator() {
546             return get().getComparator();
547         }
548 
549         @Override
getExactSizeIfKnown()550         public long getExactSizeIfKnown() {
551             return get().getExactSizeIfKnown();
552         }
553 
554         @Override
toString()555         public String toString() {
556             return getClass().getName() + "[" + get() + "]";
557         }
558 
559         static class OfPrimitive<T, T_CONS, T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>>
560             extends DelegatingSpliterator<T, T_SPLITR>
561             implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> {
OfPrimitive(Supplier<? extends T_SPLITR> supplier)562             OfPrimitive(Supplier<? extends T_SPLITR> supplier) {
563                 super(supplier);
564             }
565 
566             @Override
tryAdvance(T_CONS consumer)567             public boolean tryAdvance(T_CONS consumer) {
568                 return get().tryAdvance(consumer);
569             }
570 
571             @Override
forEachRemaining(T_CONS consumer)572             public void forEachRemaining(T_CONS consumer) {
573                 get().forEachRemaining(consumer);
574             }
575         }
576 
577         static final class OfInt
578                 extends OfPrimitive<Integer, IntConsumer, Spliterator.OfInt>
579                 implements Spliterator.OfInt {
580 
OfInt(Supplier<Spliterator.OfInt> supplier)581             OfInt(Supplier<Spliterator.OfInt> supplier) {
582                 super(supplier);
583             }
584         }
585 
586         static final class OfLong
587                 extends OfPrimitive<Long, LongConsumer, Spliterator.OfLong>
588                 implements Spliterator.OfLong {
589 
OfLong(Supplier<Spliterator.OfLong> supplier)590             OfLong(Supplier<Spliterator.OfLong> supplier) {
591                 super(supplier);
592             }
593         }
594 
595         static final class OfDouble
596                 extends OfPrimitive<Double, DoubleConsumer, Spliterator.OfDouble>
597                 implements Spliterator.OfDouble {
598 
OfDouble(Supplier<Spliterator.OfDouble> supplier)599             OfDouble(Supplier<Spliterator.OfDouble> supplier) {
600                 super(supplier);
601             }
602         }
603     }
604 
605     /**
606      * A slice Spliterator from a source Spliterator that reports
607      * {@code SUBSIZED}.
608      *
609      */
610     static abstract class SliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
611         // The start index of the slice
612         final long sliceOrigin;
613         // One past the last index of the slice
614         final long sliceFence;
615 
616         // The spliterator to slice
617         T_SPLITR s;
618         // current (absolute) index, modified on advance/split
619         long index;
620         // one past last (absolute) index or sliceFence, which ever is smaller
621         long fence;
622 
SliceSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)623         SliceSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence) {
624             assert s.hasCharacteristics(Spliterator.SUBSIZED);
625             this.s = s;
626             this.sliceOrigin = sliceOrigin;
627             this.sliceFence = sliceFence;
628             this.index = origin;
629             this.fence = fence;
630         }
631 
makeSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)632         protected abstract T_SPLITR makeSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence);
633 
trySplit()634         public T_SPLITR trySplit() {
635             if (sliceOrigin >= fence)
636                 return null;
637 
638             if (index >= fence)
639                 return null;
640 
641             // Keep splitting until the left and right splits intersect with the slice
642             // thereby ensuring the size estimate decreases.
643             // This also avoids creating empty spliterators which can result in
644             // existing and additionally created F/J tasks that perform
645             // redundant work on no elements.
646             while (true) {
647                 @SuppressWarnings("unchecked")
648                 T_SPLITR leftSplit = (T_SPLITR) s.trySplit();
649                 if (leftSplit == null)
650                     return null;
651 
652                 long leftSplitFenceUnbounded = index + leftSplit.estimateSize();
653                 long leftSplitFence = Math.min(leftSplitFenceUnbounded, sliceFence);
654                 if (sliceOrigin >= leftSplitFence) {
655                     // The left split does not intersect with, and is to the left of, the slice
656                     // The right split does intersect
657                     // Discard the left split and split further with the right split
658                     index = leftSplitFence;
659                 }
660                 else if (leftSplitFence >= sliceFence) {
661                     // The right split does not intersect with, and is to the right of, the slice
662                     // The left split does intersect
663                     // Discard the right split and split further with the left split
664                     s = leftSplit;
665                     fence = leftSplitFence;
666                 }
667                 else if (index >= sliceOrigin && leftSplitFenceUnbounded <= sliceFence) {
668                     // The left split is contained within the slice, return the underlying left split
669                     // Right split is contained within or intersects with the slice
670                     index = leftSplitFence;
671                     return leftSplit;
672                 } else {
673                     // The left split intersects with the slice
674                     // Right split is contained within or intersects with the slice
675                     return makeSpliterator(leftSplit, sliceOrigin, sliceFence, index, index = leftSplitFence);
676                 }
677             }
678         }
679 
estimateSize()680         public long estimateSize() {
681             return (sliceOrigin < fence)
682                    ? fence - Math.max(sliceOrigin, index) : 0;
683         }
684 
characteristics()685         public int characteristics() {
686             return s.characteristics();
687         }
688 
689         static final class OfRef<T>
690                 extends SliceSpliterator<T, Spliterator<T>>
691                 implements Spliterator<T> {
692 
OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence)693             OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence) {
694                 this(s, sliceOrigin, sliceFence, 0, Math.min(s.estimateSize(), sliceFence));
695             }
696 
OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence, long origin, long fence)697             private OfRef(Spliterator<T> s,
698                           long sliceOrigin, long sliceFence, long origin, long fence) {
699                 super(s, sliceOrigin, sliceFence, origin, fence);
700             }
701 
702             @Override
makeSpliterator(Spliterator<T> s, long sliceOrigin, long sliceFence, long origin, long fence)703             protected Spliterator<T> makeSpliterator(Spliterator<T> s,
704                                                      long sliceOrigin, long sliceFence,
705                                                      long origin, long fence) {
706                 return new OfRef<>(s, sliceOrigin, sliceFence, origin, fence);
707             }
708 
709             @Override
tryAdvance(Consumer<? super T> action)710             public boolean tryAdvance(Consumer<? super T> action) {
711                 Objects.requireNonNull(action);
712 
713                 if (sliceOrigin >= fence)
714                     return false;
715 
716                 while (sliceOrigin > index) {
717                     s.tryAdvance(e -> {});
718                     index++;
719                 }
720 
721                 if (index >= fence)
722                     return false;
723 
724                 index++;
725                 return s.tryAdvance(action);
726             }
727 
728             @Override
forEachRemaining(Consumer<? super T> action)729             public void forEachRemaining(Consumer<? super T> action) {
730                 Objects.requireNonNull(action);
731 
732                 if (sliceOrigin >= fence)
733                     return;
734 
735                 if (index >= fence)
736                     return;
737 
738                 if (index >= sliceOrigin && (index + s.estimateSize()) <= sliceFence) {
739                     // The spliterator is contained within the slice
740                     s.forEachRemaining(action);
741                     index = fence;
742                 } else {
743                     // The spliterator intersects with the slice
744                     while (sliceOrigin > index) {
745                         s.tryAdvance(e -> {});
746                         index++;
747                     }
748                     // Traverse elements up to the fence
749                     for (;index < fence; index++) {
750                         s.tryAdvance(action);
751                     }
752                 }
753             }
754         }
755 
756         static abstract class OfPrimitive<T,
757                 T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>,
758                 T_CONS>
759                 extends SliceSpliterator<T, T_SPLITR>
760                 implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> {
761 
OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence)762             OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence) {
763                 this(s, sliceOrigin, sliceFence, 0, Math.min(s.estimateSize(), sliceFence));
764             }
765 
OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)766             private OfPrimitive(T_SPLITR s,
767                                 long sliceOrigin, long sliceFence, long origin, long fence) {
768                 super(s, sliceOrigin, sliceFence, origin, fence);
769             }
770 
771             @Override
tryAdvance(T_CONS action)772             public boolean tryAdvance(T_CONS action) {
773                 Objects.requireNonNull(action);
774 
775                 if (sliceOrigin >= fence)
776                     return false;
777 
778                 while (sliceOrigin > index) {
779                     s.tryAdvance(emptyConsumer());
780                     index++;
781                 }
782 
783                 if (index >= fence)
784                     return false;
785 
786                 index++;
787                 return s.tryAdvance(action);
788             }
789 
790             @Override
forEachRemaining(T_CONS action)791             public void forEachRemaining(T_CONS action) {
792                 Objects.requireNonNull(action);
793 
794                 if (sliceOrigin >= fence)
795                     return;
796 
797                 if (index >= fence)
798                     return;
799 
800                 if (index >= sliceOrigin && (index + s.estimateSize()) <= sliceFence) {
801                     // The spliterator is contained within the slice
802                     s.forEachRemaining(action);
803                     index = fence;
804                 } else {
805                     // The spliterator intersects with the slice
806                     while (sliceOrigin > index) {
807                         s.tryAdvance(emptyConsumer());
808                         index++;
809                     }
810                     // Traverse elements up to the fence
811                     for (;index < fence; index++) {
812                         s.tryAdvance(action);
813                     }
814                 }
815             }
816 
emptyConsumer()817             protected abstract T_CONS emptyConsumer();
818         }
819 
820         static final class OfInt extends OfPrimitive<Integer, Spliterator.OfInt, IntConsumer>
821                 implements Spliterator.OfInt {
OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence)822             OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence) {
823                 super(s, sliceOrigin, sliceFence);
824             }
825 
OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence, long origin, long fence)826             OfInt(Spliterator.OfInt s,
827                   long sliceOrigin, long sliceFence, long origin, long fence) {
828                 super(s, sliceOrigin, sliceFence, origin, fence);
829             }
830 
831             @Override
makeSpliterator(Spliterator.OfInt s, long sliceOrigin, long sliceFence, long origin, long fence)832             protected Spliterator.OfInt makeSpliterator(Spliterator.OfInt s,
833                                                         long sliceOrigin, long sliceFence,
834                                                         long origin, long fence) {
835                 return new SliceSpliterator.OfInt(s, sliceOrigin, sliceFence, origin, fence);
836             }
837 
838             @Override
emptyConsumer()839             protected IntConsumer emptyConsumer() {
840                 return e -> {};
841             }
842         }
843 
844         static final class OfLong extends OfPrimitive<Long, Spliterator.OfLong, LongConsumer>
845                 implements Spliterator.OfLong {
OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence)846             OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence) {
847                 super(s, sliceOrigin, sliceFence);
848             }
849 
OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence, long origin, long fence)850             OfLong(Spliterator.OfLong s,
851                    long sliceOrigin, long sliceFence, long origin, long fence) {
852                 super(s, sliceOrigin, sliceFence, origin, fence);
853             }
854 
855             @Override
makeSpliterator(Spliterator.OfLong s, long sliceOrigin, long sliceFence, long origin, long fence)856             protected Spliterator.OfLong makeSpliterator(Spliterator.OfLong s,
857                                                          long sliceOrigin, long sliceFence,
858                                                          long origin, long fence) {
859                 return new SliceSpliterator.OfLong(s, sliceOrigin, sliceFence, origin, fence);
860             }
861 
862             @Override
emptyConsumer()863             protected LongConsumer emptyConsumer() {
864                 return e -> {};
865             }
866         }
867 
868         static final class OfDouble extends OfPrimitive<Double, Spliterator.OfDouble, DoubleConsumer>
869                 implements Spliterator.OfDouble {
OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence)870             OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence) {
871                 super(s, sliceOrigin, sliceFence);
872             }
873 
OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence, long origin, long fence)874             OfDouble(Spliterator.OfDouble s,
875                      long sliceOrigin, long sliceFence, long origin, long fence) {
876                 super(s, sliceOrigin, sliceFence, origin, fence);
877             }
878 
879             @Override
makeSpliterator(Spliterator.OfDouble s, long sliceOrigin, long sliceFence, long origin, long fence)880             protected Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s,
881                                                            long sliceOrigin, long sliceFence,
882                                                            long origin, long fence) {
883                 return new SliceSpliterator.OfDouble(s, sliceOrigin, sliceFence, origin, fence);
884             }
885 
886             @Override
emptyConsumer()887             protected DoubleConsumer emptyConsumer() {
888                 return e -> {};
889             }
890         }
891     }
892 
893     /**
894      * A slice Spliterator that does not preserve order, if any, of a source
895      * Spliterator.
896      *
897      * Note: The source spliterator may report {@code ORDERED} since that
898      * spliterator be the result of a previous pipeline stage that was
899      * collected to a {@code Node}. It is the order of the pipeline stage
900      * that governs whether the this slice spliterator is to be used or not.
901      */
902     static abstract class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
903         static final int CHUNK_SIZE = 1 << 7;
904 
905         // The spliterator to slice
906         protected final T_SPLITR s;
907         protected final boolean unlimited;
908         private final long skipThreshold;
909         private final AtomicLong permits;
910 
UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit)911         UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
912             this.s = s;
913             this.unlimited = limit < 0;
914             this.skipThreshold = limit >= 0 ? limit : 0;
915             this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
916         }
917 
UnorderedSliceSpliterator(T_SPLITR s, UnorderedSliceSpliterator<T, T_SPLITR> parent)918         UnorderedSliceSpliterator(T_SPLITR s,
919                                   UnorderedSliceSpliterator<T, T_SPLITR> parent) {
920             this.s = s;
921             this.unlimited = parent.unlimited;
922             this.permits = parent.permits;
923             this.skipThreshold = parent.skipThreshold;
924         }
925 
926         /**
927          * Acquire permission to skip or process elements.  The caller must
928          * first acquire the elements, then consult this method for guidance
929          * as to what to do with the data.
930          *
931          * <p>We use an {@code AtomicLong} to atomically maintain a counter,
932          * which is initialized as skip+limit if we are limiting, or skip only
933          * if we are not limiting.  The user should consult the method
934          * {@code checkPermits()} before acquiring data elements.
935          *
936          * @param numElements the number of elements the caller has in hand
937          * @return the number of elements that should be processed; any
938          * remaining elements should be discarded.
939          */
acquirePermits(long numElements)940         protected final long acquirePermits(long numElements) {
941             long remainingPermits;
942             long grabbing;
943             // permits never increase, and don't decrease below zero
944             assert numElements > 0;
945             do {
946                 remainingPermits = permits.get();
947                 if (remainingPermits == 0)
948                     return unlimited ? numElements : 0;
949                 grabbing = Math.min(remainingPermits, numElements);
950             } while (grabbing > 0 &&
951                      !permits.compareAndSet(remainingPermits, remainingPermits - grabbing));
952 
953             if (unlimited)
954                 return Math.max(numElements - grabbing, 0);
955             else if (remainingPermits > skipThreshold)
956                 return Math.max(grabbing - (remainingPermits - skipThreshold), 0);
957             else
958                 return grabbing;
959         }
960 
961         enum PermitStatus { NO_MORE, MAYBE_MORE, UNLIMITED }
962 
963         /** Call to check if permits might be available before acquiring data */
permitStatus()964         protected final PermitStatus permitStatus() {
965             if (permits.get() > 0)
966                 return PermitStatus.MAYBE_MORE;
967             else
968                 return unlimited ?  PermitStatus.UNLIMITED : PermitStatus.NO_MORE;
969         }
970 
trySplit()971         public final T_SPLITR trySplit() {
972             // Stop splitting when there are no more limit permits
973             if (permits.get() == 0)
974                 return null;
975             @SuppressWarnings("unchecked")
976             T_SPLITR split = (T_SPLITR) s.trySplit();
977             return split == null ? null : makeSpliterator(split);
978         }
979 
makeSpliterator(T_SPLITR s)980         protected abstract T_SPLITR makeSpliterator(T_SPLITR s);
981 
estimateSize()982         public final long estimateSize() {
983             return s.estimateSize();
984         }
985 
characteristics()986         public final int characteristics() {
987             return s.characteristics() &
988                    ~(Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED);
989         }
990 
991         static final class OfRef<T> extends UnorderedSliceSpliterator<T, Spliterator<T>>
992                 implements Spliterator<T>, Consumer<T> {
993             T tmpSlot;
994 
OfRef(Spliterator<T> s, long skip, long limit)995             OfRef(Spliterator<T> s, long skip, long limit) {
996                 super(s, skip, limit);
997             }
998 
OfRef(Spliterator<T> s, OfRef<T> parent)999             OfRef(Spliterator<T> s, OfRef<T> parent) {
1000                 super(s, parent);
1001             }
1002 
1003             @Override
accept(T t)1004             public final void accept(T t) {
1005                 tmpSlot = t;
1006             }
1007 
1008             @Override
tryAdvance(Consumer<? super T> action)1009             public boolean tryAdvance(Consumer<? super T> action) {
1010                 Objects.requireNonNull(action);
1011 
1012                 while (permitStatus() != PermitStatus.NO_MORE) {
1013                     if (!s.tryAdvance(this))
1014                         return false;
1015                     else if (acquirePermits(1) == 1) {
1016                         action.accept(tmpSlot);
1017                         tmpSlot = null;
1018                         return true;
1019                     }
1020                 }
1021                 return false;
1022             }
1023 
1024             @Override
forEachRemaining(Consumer<? super T> action)1025             public void forEachRemaining(Consumer<? super T> action) {
1026                 Objects.requireNonNull(action);
1027 
1028                 ArrayBuffer.OfRef<T> sb = null;
1029                 PermitStatus permitStatus;
1030                 while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
1031                     if (permitStatus == PermitStatus.MAYBE_MORE) {
1032                         // Optimistically traverse elements up to a threshold of CHUNK_SIZE
1033                         if (sb == null)
1034                             sb = new ArrayBuffer.OfRef<>(CHUNK_SIZE);
1035                         else
1036                             sb.reset();
1037                         long permitsRequested = 0;
1038                         do { } while (s.tryAdvance(sb) && ++permitsRequested < CHUNK_SIZE);
1039                         if (permitsRequested == 0)
1040                             return;
1041                         sb.forEach(action, acquirePermits(permitsRequested));
1042                     }
1043                     else {
1044                         // Must be UNLIMITED; let 'er rip
1045                         s.forEachRemaining(action);
1046                         return;
1047                     }
1048                 }
1049             }
1050 
1051             @Override
makeSpliterator(Spliterator<T> s)1052             protected Spliterator<T> makeSpliterator(Spliterator<T> s) {
1053                 return new UnorderedSliceSpliterator.OfRef<>(s, this);
1054             }
1055         }
1056 
1057         /**
1058          * Concrete sub-types must also be an instance of type {@code T_CONS}.
1059          *
1060          * @param <T_BUFF> the type of the spined buffer. Must also be a type of
1061          *        {@code T_CONS}.
1062          */
1063         static abstract class OfPrimitive<
1064                 T,
1065                 T_CONS,
1066                 T_BUFF extends ArrayBuffer.OfPrimitive<T_CONS>,
1067                 T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>>
1068                 extends UnorderedSliceSpliterator<T, T_SPLITR>
1069                 implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> {
OfPrimitive(T_SPLITR s, long skip, long limit)1070             OfPrimitive(T_SPLITR s, long skip, long limit) {
1071                 super(s, skip, limit);
1072             }
1073 
OfPrimitive(T_SPLITR s, UnorderedSliceSpliterator.OfPrimitive<T, T_CONS, T_BUFF, T_SPLITR> parent)1074             OfPrimitive(T_SPLITR s, UnorderedSliceSpliterator.OfPrimitive<T, T_CONS, T_BUFF, T_SPLITR> parent) {
1075                 super(s, parent);
1076             }
1077 
1078             @Override
tryAdvance(T_CONS action)1079             public boolean tryAdvance(T_CONS action) {
1080                 Objects.requireNonNull(action);
1081                 @SuppressWarnings("unchecked")
1082                 T_CONS consumer = (T_CONS) this;
1083 
1084                 while (permitStatus() != PermitStatus.NO_MORE) {
1085                     if (!s.tryAdvance(consumer))
1086                         return false;
1087                     else if (acquirePermits(1) == 1) {
1088                         acceptConsumed(action);
1089                         return true;
1090                     }
1091                 }
1092                 return false;
1093             }
1094 
acceptConsumed(T_CONS action)1095             protected abstract void acceptConsumed(T_CONS action);
1096 
1097             @Override
forEachRemaining(T_CONS action)1098             public void forEachRemaining(T_CONS action) {
1099                 Objects.requireNonNull(action);
1100 
1101                 T_BUFF sb = null;
1102                 PermitStatus permitStatus;
1103                 while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
1104                     if (permitStatus == PermitStatus.MAYBE_MORE) {
1105                         // Optimistically traverse elements up to a threshold of CHUNK_SIZE
1106                         if (sb == null)
1107                             sb = bufferCreate(CHUNK_SIZE);
1108                         else
1109                             sb.reset();
1110                         @SuppressWarnings("unchecked")
1111                         T_CONS sbc = (T_CONS) sb;
1112                         long permitsRequested = 0;
1113                         do { } while (s.tryAdvance(sbc) && ++permitsRequested < CHUNK_SIZE);
1114                         if (permitsRequested == 0)
1115                             return;
1116                         sb.forEach(action, acquirePermits(permitsRequested));
1117                     }
1118                     else {
1119                         // Must be UNLIMITED; let 'er rip
1120                         s.forEachRemaining(action);
1121                         return;
1122                     }
1123                 }
1124             }
1125 
bufferCreate(int initialCapacity)1126             protected abstract T_BUFF bufferCreate(int initialCapacity);
1127         }
1128 
1129         static final class OfInt
1130                 extends OfPrimitive<Integer, IntConsumer, ArrayBuffer.OfInt, Spliterator.OfInt>
1131                 implements Spliterator.OfInt, IntConsumer {
1132 
1133             int tmpValue;
1134 
OfInt(Spliterator.OfInt s, long skip, long limit)1135             OfInt(Spliterator.OfInt s, long skip, long limit) {
1136                 super(s, skip, limit);
1137             }
1138 
OfInt(Spliterator.OfInt s, UnorderedSliceSpliterator.OfInt parent)1139             OfInt(Spliterator.OfInt s, UnorderedSliceSpliterator.OfInt parent) {
1140                 super(s, parent);
1141             }
1142 
1143             @Override
accept(int value)1144             public void accept(int value) {
1145                 tmpValue = value;
1146             }
1147 
1148             @Override
acceptConsumed(IntConsumer action)1149             protected void acceptConsumed(IntConsumer action) {
1150                 action.accept(tmpValue);
1151             }
1152 
1153             @Override
bufferCreate(int initialCapacity)1154             protected ArrayBuffer.OfInt bufferCreate(int initialCapacity) {
1155                 return new ArrayBuffer.OfInt(initialCapacity);
1156             }
1157 
1158             @Override
makeSpliterator(Spliterator.OfInt s)1159             protected Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) {
1160                 return new UnorderedSliceSpliterator.OfInt(s, this);
1161             }
1162         }
1163 
1164         static final class OfLong
1165                 extends OfPrimitive<Long, LongConsumer, ArrayBuffer.OfLong, Spliterator.OfLong>
1166                 implements Spliterator.OfLong, LongConsumer {
1167 
1168             long tmpValue;
1169 
OfLong(Spliterator.OfLong s, long skip, long limit)1170             OfLong(Spliterator.OfLong s, long skip, long limit) {
1171                 super(s, skip, limit);
1172             }
1173 
OfLong(Spliterator.OfLong s, UnorderedSliceSpliterator.OfLong parent)1174             OfLong(Spliterator.OfLong s, UnorderedSliceSpliterator.OfLong parent) {
1175                 super(s, parent);
1176             }
1177 
1178             @Override
accept(long value)1179             public void accept(long value) {
1180                 tmpValue = value;
1181             }
1182 
1183             @Override
acceptConsumed(LongConsumer action)1184             protected void acceptConsumed(LongConsumer action) {
1185                 action.accept(tmpValue);
1186             }
1187 
1188             @Override
bufferCreate(int initialCapacity)1189             protected ArrayBuffer.OfLong bufferCreate(int initialCapacity) {
1190                 return new ArrayBuffer.OfLong(initialCapacity);
1191             }
1192 
1193             @Override
makeSpliterator(Spliterator.OfLong s)1194             protected Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) {
1195                 return new UnorderedSliceSpliterator.OfLong(s, this);
1196             }
1197         }
1198 
1199         static final class OfDouble
1200                 extends OfPrimitive<Double, DoubleConsumer, ArrayBuffer.OfDouble, Spliterator.OfDouble>
1201                 implements Spliterator.OfDouble, DoubleConsumer {
1202 
1203             double tmpValue;
1204 
OfDouble(Spliterator.OfDouble s, long skip, long limit)1205             OfDouble(Spliterator.OfDouble s, long skip, long limit) {
1206                 super(s, skip, limit);
1207             }
1208 
OfDouble(Spliterator.OfDouble s, UnorderedSliceSpliterator.OfDouble parent)1209             OfDouble(Spliterator.OfDouble s, UnorderedSliceSpliterator.OfDouble parent) {
1210                 super(s, parent);
1211             }
1212 
1213             @Override
accept(double value)1214             public void accept(double value) {
1215                 tmpValue = value;
1216             }
1217 
1218             @Override
acceptConsumed(DoubleConsumer action)1219             protected void acceptConsumed(DoubleConsumer action) {
1220                 action.accept(tmpValue);
1221             }
1222 
1223             @Override
bufferCreate(int initialCapacity)1224             protected ArrayBuffer.OfDouble bufferCreate(int initialCapacity) {
1225                 return new ArrayBuffer.OfDouble(initialCapacity);
1226             }
1227 
1228             @Override
makeSpliterator(Spliterator.OfDouble s)1229             protected Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) {
1230                 return new UnorderedSliceSpliterator.OfDouble(s, this);
1231             }
1232         }
1233     }
1234 
1235     /**
1236      * A wrapping spliterator that only reports distinct elements of the
1237      * underlying spliterator. Does not preserve size and encounter order.
1238      */
1239     static final class DistinctSpliterator<T> implements Spliterator<T>, Consumer<T> {
1240 
1241         // The value to represent null in the ConcurrentHashMap
1242         private static final Object NULL_VALUE = new Object();
1243 
1244         // The underlying spliterator
1245         private final Spliterator<T> s;
1246 
1247         // ConcurrentHashMap holding distinct elements as keys
1248         private final ConcurrentHashMap<T, Boolean> seen;
1249 
1250         // Temporary element, only used with tryAdvance
1251         private T tmpSlot;
1252 
DistinctSpliterator(Spliterator<T> s)1253         DistinctSpliterator(Spliterator<T> s) {
1254             this(s, new ConcurrentHashMap<>());
1255         }
1256 
DistinctSpliterator(Spliterator<T> s, ConcurrentHashMap<T, Boolean> seen)1257         private DistinctSpliterator(Spliterator<T> s, ConcurrentHashMap<T, Boolean> seen) {
1258             this.s = s;
1259             this.seen = seen;
1260         }
1261 
1262         @Override
accept(T t)1263         public void accept(T t) {
1264             this.tmpSlot = t;
1265         }
1266 
1267         @SuppressWarnings("unchecked")
mapNull(T t)1268         private T mapNull(T t) {
1269             return t != null ? t : (T) NULL_VALUE;
1270         }
1271 
1272         @Override
tryAdvance(Consumer<? super T> action)1273         public boolean tryAdvance(Consumer<? super T> action) {
1274             while (s.tryAdvance(this)) {
1275                 if (seen.putIfAbsent(mapNull(tmpSlot), Boolean.TRUE) == null) {
1276                     action.accept(tmpSlot);
1277                     tmpSlot = null;
1278                     return true;
1279                 }
1280             }
1281             return false;
1282         }
1283 
1284         @Override
forEachRemaining(Consumer<? super T> action)1285         public void forEachRemaining(Consumer<? super T> action) {
1286             s.forEachRemaining(t -> {
1287                 if (seen.putIfAbsent(mapNull(t), Boolean.TRUE) == null) {
1288                     action.accept(t);
1289                 }
1290             });
1291         }
1292 
1293         @Override
trySplit()1294         public Spliterator<T> trySplit() {
1295             Spliterator<T> split = s.trySplit();
1296             return (split != null) ? new DistinctSpliterator<>(split, seen) : null;
1297         }
1298 
1299         @Override
estimateSize()1300         public long estimateSize() {
1301             return s.estimateSize();
1302         }
1303 
1304         @Override
characteristics()1305         public int characteristics() {
1306             return (s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED |
1307                                             Spliterator.SORTED | Spliterator.ORDERED))
1308                    | Spliterator.DISTINCT;
1309         }
1310 
1311         @Override
getComparator()1312         public Comparator<? super T> getComparator() {
1313             return s.getComparator();
1314         }
1315     }
1316 
1317     /**
1318      * A Spliterator that infinitely supplies elements in no particular order.
1319      *
1320      * <p>Splitting divides the estimated size in two and stops when the
1321      * estimate size is 0.
1322      *
1323      * <p>The {@code forEachRemaining} method if invoked will never terminate.
1324      * The {@code tryAdvance} method always returns true.
1325      *
1326      */
1327     static abstract class InfiniteSupplyingSpliterator<T> implements Spliterator<T> {
1328         long estimate;
1329 
InfiniteSupplyingSpliterator(long estimate)1330         protected InfiniteSupplyingSpliterator(long estimate) {
1331             this.estimate = estimate;
1332         }
1333 
1334         @Override
estimateSize()1335         public long estimateSize() {
1336             return estimate;
1337         }
1338 
1339         @Override
characteristics()1340         public int characteristics() {
1341             return IMMUTABLE;
1342         }
1343 
1344         static final class OfRef<T> extends InfiniteSupplyingSpliterator<T> {
1345             final Supplier<T> s;
1346 
OfRef(long size, Supplier<T> s)1347             OfRef(long size, Supplier<T> s) {
1348                 super(size);
1349                 this.s = s;
1350             }
1351 
1352             @Override
tryAdvance(Consumer<? super T> action)1353             public boolean tryAdvance(Consumer<? super T> action) {
1354                 Objects.requireNonNull(action);
1355 
1356                 action.accept(s.get());
1357                 return true;
1358             }
1359 
1360             @Override
trySplit()1361             public Spliterator<T> trySplit() {
1362                 if (estimate == 0)
1363                     return null;
1364                 return new InfiniteSupplyingSpliterator.OfRef<>(estimate >>>= 1, s);
1365             }
1366         }
1367 
1368         static final class OfInt extends InfiniteSupplyingSpliterator<Integer>
1369                 implements Spliterator.OfInt {
1370             final IntSupplier s;
1371 
OfInt(long size, IntSupplier s)1372             OfInt(long size, IntSupplier s) {
1373                 super(size);
1374                 this.s = s;
1375             }
1376 
1377             @Override
tryAdvance(IntConsumer action)1378             public boolean tryAdvance(IntConsumer action) {
1379                 Objects.requireNonNull(action);
1380 
1381                 action.accept(s.getAsInt());
1382                 return true;
1383             }
1384 
1385             @Override
trySplit()1386             public Spliterator.OfInt trySplit() {
1387                 if (estimate == 0)
1388                     return null;
1389                 return new InfiniteSupplyingSpliterator.OfInt(estimate = estimate >>> 1, s);
1390             }
1391         }
1392 
1393         static final class OfLong extends InfiniteSupplyingSpliterator<Long>
1394                 implements Spliterator.OfLong {
1395             final LongSupplier s;
1396 
OfLong(long size, LongSupplier s)1397             OfLong(long size, LongSupplier s) {
1398                 super(size);
1399                 this.s = s;
1400             }
1401 
1402             @Override
tryAdvance(LongConsumer action)1403             public boolean tryAdvance(LongConsumer action) {
1404                 Objects.requireNonNull(action);
1405 
1406                 action.accept(s.getAsLong());
1407                 return true;
1408             }
1409 
1410             @Override
trySplit()1411             public Spliterator.OfLong trySplit() {
1412                 if (estimate == 0)
1413                     return null;
1414                 return new InfiniteSupplyingSpliterator.OfLong(estimate = estimate >>> 1, s);
1415             }
1416         }
1417 
1418         static final class OfDouble extends InfiniteSupplyingSpliterator<Double>
1419                 implements Spliterator.OfDouble {
1420             final DoubleSupplier s;
1421 
OfDouble(long size, DoubleSupplier s)1422             OfDouble(long size, DoubleSupplier s) {
1423                 super(size);
1424                 this.s = s;
1425             }
1426 
1427             @Override
tryAdvance(DoubleConsumer action)1428             public boolean tryAdvance(DoubleConsumer action) {
1429                 Objects.requireNonNull(action);
1430 
1431                 action.accept(s.getAsDouble());
1432                 return true;
1433             }
1434 
1435             @Override
trySplit()1436             public Spliterator.OfDouble trySplit() {
1437                 if (estimate == 0)
1438                     return null;
1439                 return new InfiniteSupplyingSpliterator.OfDouble(estimate = estimate >>> 1, s);
1440             }
1441         }
1442     }
1443 
1444     // @@@ Consolidate with Node.Builder
1445     static abstract class ArrayBuffer {
1446         int index;
1447 
reset()1448         void reset() {
1449             index = 0;
1450         }
1451 
1452         static final class OfRef<T> extends ArrayBuffer implements Consumer<T> {
1453             final Object[] array;
1454 
OfRef(int size)1455             OfRef(int size) {
1456                 this.array = new Object[size];
1457             }
1458 
1459             @Override
accept(T t)1460             public void accept(T t) {
1461                 array[index++] = t;
1462             }
1463 
forEach(Consumer<? super T> action, long fence)1464             public void forEach(Consumer<? super T> action, long fence) {
1465                 for (int i = 0; i < fence; i++) {
1466                     @SuppressWarnings("unchecked")
1467                     T t = (T) array[i];
1468                     action.accept(t);
1469                 }
1470             }
1471         }
1472 
1473         static abstract class OfPrimitive<T_CONS> extends ArrayBuffer {
1474             int index;
1475 
1476             @Override
reset()1477             void reset() {
1478                 index = 0;
1479             }
1480 
forEach(T_CONS action, long fence)1481             abstract void forEach(T_CONS action, long fence);
1482         }
1483 
1484         static final class OfInt extends OfPrimitive<IntConsumer>
1485                 implements IntConsumer {
1486             final int[] array;
1487 
OfInt(int size)1488             OfInt(int size) {
1489                 this.array = new int[size];
1490             }
1491 
1492             @Override
accept(int t)1493             public void accept(int t) {
1494                 array[index++] = t;
1495             }
1496 
1497             @Override
forEach(IntConsumer action, long fence)1498             public void forEach(IntConsumer action, long fence) {
1499                 for (int i = 0; i < fence; i++) {
1500                     action.accept(array[i]);
1501                 }
1502             }
1503         }
1504 
1505         static final class OfLong extends OfPrimitive<LongConsumer>
1506                 implements LongConsumer {
1507             final long[] array;
1508 
OfLong(int size)1509             OfLong(int size) {
1510                 this.array = new long[size];
1511             }
1512 
1513             @Override
accept(long t)1514             public void accept(long t) {
1515                 array[index++] = t;
1516             }
1517 
1518             @Override
forEach(LongConsumer action, long fence)1519             public void forEach(LongConsumer action, long fence) {
1520                 for (int i = 0; i < fence; i++) {
1521                     action.accept(array[i]);
1522                 }
1523             }
1524         }
1525 
1526         static final class OfDouble extends OfPrimitive<DoubleConsumer>
1527                 implements DoubleConsumer {
1528             final double[] array;
1529 
OfDouble(int size)1530             OfDouble(int size) {
1531                 this.array = new double[size];
1532             }
1533 
1534             @Override
accept(double t)1535             public void accept(double t) {
1536                 array[index++] = t;
1537             }
1538 
1539             @Override
forEach(DoubleConsumer action, long fence)1540             void forEach(DoubleConsumer action, long fence) {
1541                 for (int i = 0; i < fence; i++) {
1542                     action.accept(array[i]);
1543                 }
1544             }
1545         }
1546     }
1547 }
1548 
1549