1 /*
2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Comparator;
30 import java.util.Objects;
31 import java.util.Spliterator;
32 import java.util.function.IntFunction;
33 
34 
35 /**
36  * Factory methods for transforming streams into sorted streams.
37  *
38  * @since 1.8
39  */
40 final class SortedOps {
41 
SortedOps()42     private SortedOps() { }
43 
44     /**
45      * Appends a "sorted" operation to the provided stream.
46      *
47      * @param <T> the type of both input and output elements
48      * @param upstream a reference stream with element type T
49      */
makeRef(AbstractPipeline<?, T, ?> upstream)50     static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
51         return new OfRef<>(upstream);
52     }
53 
54     /**
55      * Appends a "sorted" operation to the provided stream.
56      *
57      * @param <T> the type of both input and output elements
58      * @param upstream a reference stream with element type T
59      * @param comparator the comparator to order elements by
60      */
makeRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator)61     static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
62                                 Comparator<? super T> comparator) {
63         return new OfRef<>(upstream, comparator);
64     }
65 
66     /**
67      * Appends a "sorted" operation to the provided stream.
68      *
69      * @param <T> the type of both input and output elements
70      * @param upstream a reference stream with element type T
71      */
makeInt(AbstractPipeline<?, Integer, ?> upstream)72     static <T> IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream) {
73         return new OfInt(upstream);
74     }
75 
76     /**
77      * Appends a "sorted" operation to the provided stream.
78      *
79      * @param <T> the type of both input and output elements
80      * @param upstream a reference stream with element type T
81      */
makeLong(AbstractPipeline<?, Long, ?> upstream)82     static <T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) {
83         return new OfLong(upstream);
84     }
85 
86     /**
87      * Appends a "sorted" operation to the provided stream.
88      *
89      * @param <T> the type of both input and output elements
90      * @param upstream a reference stream with element type T
91      */
makeDouble(AbstractPipeline<?, Double, ?> upstream)92     static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) {
93         return new OfDouble(upstream);
94     }
95 
96     /**
97      * Specialized subtype for sorting reference streams
98      */
99     private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
100         /**
101          * Comparator used for sorting
102          */
103         private final boolean isNaturalSort;
104         private final Comparator<? super T> comparator;
105 
106         /**
107          * Sort using natural order of {@literal <T>} which must be
108          * {@code Comparable}.
109          */
OfRef(AbstractPipeline<?, T, ?> upstream)110         OfRef(AbstractPipeline<?, T, ?> upstream) {
111             super(upstream, StreamShape.REFERENCE,
112                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
113             this.isNaturalSort = true;
114             // Will throw CCE when we try to sort if T is not Comparable
115             @SuppressWarnings("unchecked")
116             Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
117             this.comparator = comp;
118         }
119 
120         /**
121          * Sort using the provided comparator.
122          *
123          * @param comparator The comparator to be used to evaluate ordering.
124          */
OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator)125         OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
126             super(upstream, StreamShape.REFERENCE,
127                   StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
128             this.isNaturalSort = false;
129             this.comparator = Objects.requireNonNull(comparator);
130         }
131 
132         @Override
opWrapSink(int flags, Sink<T> sink)133         public Sink<T> opWrapSink(int flags, Sink<T> sink) {
134             Objects.requireNonNull(sink);
135 
136             // If the input is already naturally sorted and this operation
137             // also naturally sorted then this is a no-op
138             if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
139                 return sink;
140             else if (StreamOpFlag.SIZED.isKnown(flags))
141                 return new SizedRefSortingSink<>(sink, comparator);
142             else
143                 return new RefSortingSink<>(sink, comparator);
144         }
145 
146         @Override
opEvaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator, IntFunction<T[]> generator)147         public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
148                                                  Spliterator<P_IN> spliterator,
149                                                  IntFunction<T[]> generator) {
150             // If the input is already naturally sorted and this operation
151             // naturally sorts then collect the output
152             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
153                 return helper.evaluate(spliterator, false, generator);
154             }
155             else {
156                 // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
157                 T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
158                 Arrays.parallelSort(flattenedData, comparator);
159                 return Nodes.node(flattenedData);
160             }
161         }
162     }
163 
164     /**
165      * Specialized subtype for sorting int streams.
166      */
167     private static final class OfInt extends IntPipeline.StatefulOp<Integer> {
OfInt(AbstractPipeline<?, Integer, ?> upstream)168         OfInt(AbstractPipeline<?, Integer, ?> upstream) {
169             super(upstream, StreamShape.INT_VALUE,
170                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
171         }
172 
173         @Override
opWrapSink(int flags, Sink<Integer> sink)174         public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
175             Objects.requireNonNull(sink);
176 
177             if (StreamOpFlag.SORTED.isKnown(flags))
178                 return sink;
179             else if (StreamOpFlag.SIZED.isKnown(flags))
180                 return new SizedIntSortingSink(sink);
181             else
182                 return new IntSortingSink(sink);
183         }
184 
185         @Override
opEvaluateParallel(PipelineHelper<Integer> helper, Spliterator<P_IN> spliterator, IntFunction<Integer[]> generator)186         public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
187                                                        Spliterator<P_IN> spliterator,
188                                                        IntFunction<Integer[]> generator) {
189             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
190                 return helper.evaluate(spliterator, false, generator);
191             }
192             else {
193                 Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);
194 
195                 int[] content = n.asPrimitiveArray();
196                 Arrays.parallelSort(content);
197 
198                 return Nodes.node(content);
199             }
200         }
201     }
202 
203     /**
204      * Specialized subtype for sorting long streams.
205      */
206     private static final class OfLong extends LongPipeline.StatefulOp<Long> {
OfLong(AbstractPipeline<?, Long, ?> upstream)207         OfLong(AbstractPipeline<?, Long, ?> upstream) {
208             super(upstream, StreamShape.LONG_VALUE,
209                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
210         }
211 
212         @Override
opWrapSink(int flags, Sink<Long> sink)213         public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
214             Objects.requireNonNull(sink);
215 
216             if (StreamOpFlag.SORTED.isKnown(flags))
217                 return sink;
218             else if (StreamOpFlag.SIZED.isKnown(flags))
219                 return new SizedLongSortingSink(sink);
220             else
221                 return new LongSortingSink(sink);
222         }
223 
224         @Override
opEvaluateParallel(PipelineHelper<Long> helper, Spliterator<P_IN> spliterator, IntFunction<Long[]> generator)225         public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
226                                                     Spliterator<P_IN> spliterator,
227                                                     IntFunction<Long[]> generator) {
228             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
229                 return helper.evaluate(spliterator, false, generator);
230             }
231             else {
232                 Node.OfLong n = (Node.OfLong) helper.evaluate(spliterator, true, generator);
233 
234                 long[] content = n.asPrimitiveArray();
235                 Arrays.parallelSort(content);
236 
237                 return Nodes.node(content);
238             }
239         }
240     }
241 
242     /**
243      * Specialized subtype for sorting double streams.
244      */
245     private static final class OfDouble extends DoublePipeline.StatefulOp<Double> {
OfDouble(AbstractPipeline<?, Double, ?> upstream)246         OfDouble(AbstractPipeline<?, Double, ?> upstream) {
247             super(upstream, StreamShape.DOUBLE_VALUE,
248                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
249         }
250 
251         @Override
opWrapSink(int flags, Sink<Double> sink)252         public Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
253             Objects.requireNonNull(sink);
254 
255             if (StreamOpFlag.SORTED.isKnown(flags))
256                 return sink;
257             else if (StreamOpFlag.SIZED.isKnown(flags))
258                 return new SizedDoubleSortingSink(sink);
259             else
260                 return new DoubleSortingSink(sink);
261         }
262 
263         @Override
opEvaluateParallel(PipelineHelper<Double> helper, Spliterator<P_IN> spliterator, IntFunction<Double[]> generator)264         public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
265                                                       Spliterator<P_IN> spliterator,
266                                                       IntFunction<Double[]> generator) {
267             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
268                 return helper.evaluate(spliterator, false, generator);
269             }
270             else {
271                 Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator);
272 
273                 double[] content = n.asPrimitiveArray();
274                 Arrays.parallelSort(content);
275 
276                 return Nodes.node(content);
277             }
278         }
279     }
280 
281     /**
282      * Abstract {@link Sink} for implementing sort on reference streams.
283      *
284      * <p>
285      * Note: documentation below applies to reference and all primitive sinks.
286      * <p>
287      * Sorting sinks first accept all elements, buffering then into an array
288      * or a re-sizable data structure, if the size of the pipeline is known or
289      * unknown respectively.  At the end of the sink protocol those elements are
290      * sorted and then pushed downstream.
291      * This class records if {@link #cancellationRequested} is called.  If so it
292      * can be inferred that the source pushing source elements into the pipeline
293      * knows that the pipeline is short-circuiting.  In such cases sub-classes
294      * pushing elements downstream will preserve the short-circuiting protocol
295      * by calling {@code downstream.cancellationRequested()} and checking the
296      * result is {@code false} before an element is pushed.
297      * <p>
298      * Note that the above behaviour is an optimization for sorting with
299      * sequential streams.  It is not an error that more elements, than strictly
300      * required to produce a result, may flow through the pipeline.  This can
301      * occur, in general (not restricted to just sorting), for short-circuiting
302      * parallel pipelines.
303      */
304     private static abstract class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> {
305         protected final Comparator<? super T> comparator;
306         // @@@ could be a lazy final value, if/when support is added
307         protected boolean cancellationWasRequested;
308 
AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator)309         AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
310             super(downstream);
311             this.comparator = comparator;
312         }
313 
314         /**
315          * Records is cancellation is requested so short-circuiting behaviour
316          * can be preserved when the sorted elements are pushed downstream.
317          *
318          * @return false, as this sink never short-circuits.
319          */
320         @Override
cancellationRequested()321         public final boolean cancellationRequested() {
322             cancellationWasRequested = true;
323             return false;
324         }
325     }
326 
327     /**
328      * {@link Sink} for implementing sort on SIZED reference streams.
329      */
330     private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {
331         private T[] array;
332         private int offset;
333 
SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator)334         SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
335             super(sink, comparator);
336         }
337 
338         @Override
339         @SuppressWarnings("unchecked")
begin(long size)340         public void begin(long size) {
341             if (size >= Nodes.MAX_ARRAY_SIZE)
342                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
343             array = (T[]) new Object[(int) size];
344         }
345 
346         @Override
end()347         public void end() {
348             Arrays.sort(array, 0, offset, comparator);
349             downstream.begin(offset);
350             if (!cancellationWasRequested) {
351                 for (int i = 0; i < offset; i++)
352                     downstream.accept(array[i]);
353             }
354             else {
355                 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
356                     downstream.accept(array[i]);
357             }
358             downstream.end();
359             array = null;
360         }
361 
362         @Override
accept(T t)363         public void accept(T t) {
364             array[offset++] = t;
365         }
366     }
367 
368     /**
369      * {@link Sink} for implementing sort on reference streams.
370      */
371     private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
372         private ArrayList<T> list;
373 
RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator)374         RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
375             super(sink, comparator);
376         }
377 
378         @Override
begin(long size)379         public void begin(long size) {
380             if (size >= Nodes.MAX_ARRAY_SIZE)
381                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
382             list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
383         }
384 
385         @Override
end()386         public void end() {
387             list.sort(comparator);
388             downstream.begin(list.size());
389             if (!cancellationWasRequested) {
390                 list.forEach(downstream::accept);
391             }
392             else {
393                 for (T t : list) {
394                     if (downstream.cancellationRequested()) break;
395                     downstream.accept(t);
396                 }
397             }
398             downstream.end();
399             list = null;
400         }
401 
402         @Override
accept(T t)403         public void accept(T t) {
404             list.add(t);
405         }
406     }
407 
408     /**
409      * Abstract {@link Sink} for implementing sort on int streams.
410      */
411     private static abstract class AbstractIntSortingSink extends Sink.ChainedInt<Integer> {
412         protected boolean cancellationWasRequested;
413 
AbstractIntSortingSink(Sink<? super Integer> downstream)414         AbstractIntSortingSink(Sink<? super Integer> downstream) {
415             super(downstream);
416         }
417 
418         @Override
cancellationRequested()419         public final boolean cancellationRequested() {
420             cancellationWasRequested = true;
421             return false;
422         }
423     }
424 
425     /**
426      * {@link Sink} for implementing sort on SIZED int streams.
427      */
428     private static final class SizedIntSortingSink extends AbstractIntSortingSink {
429         private int[] array;
430         private int offset;
431 
SizedIntSortingSink(Sink<? super Integer> downstream)432         SizedIntSortingSink(Sink<? super Integer> downstream) {
433             super(downstream);
434         }
435 
436         @Override
begin(long size)437         public void begin(long size) {
438             if (size >= Nodes.MAX_ARRAY_SIZE)
439                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
440             array = new int[(int) size];
441         }
442 
443         @Override
end()444         public void end() {
445             Arrays.sort(array, 0, offset);
446             downstream.begin(offset);
447             if (!cancellationWasRequested) {
448                 for (int i = 0; i < offset; i++)
449                     downstream.accept(array[i]);
450             }
451             else {
452                 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
453                     downstream.accept(array[i]);
454             }
455             downstream.end();
456             array = null;
457         }
458 
459         @Override
accept(int t)460         public void accept(int t) {
461             array[offset++] = t;
462         }
463     }
464 
465     /**
466      * {@link Sink} for implementing sort on int streams.
467      */
468     private static final class IntSortingSink extends AbstractIntSortingSink {
469         private SpinedBuffer.OfInt b;
470 
IntSortingSink(Sink<? super Integer> sink)471         IntSortingSink(Sink<? super Integer> sink) {
472             super(sink);
473         }
474 
475         @Override
begin(long size)476         public void begin(long size) {
477             if (size >= Nodes.MAX_ARRAY_SIZE)
478                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
479             b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt();
480         }
481 
482         @Override
end()483         public void end() {
484             int[] ints = b.asPrimitiveArray();
485             Arrays.sort(ints);
486             downstream.begin(ints.length);
487             if (!cancellationWasRequested) {
488                 for (int anInt : ints)
489                     downstream.accept(anInt);
490             }
491             else {
492                 for (int anInt : ints) {
493                     if (downstream.cancellationRequested()) break;
494                     downstream.accept(anInt);
495                 }
496             }
497             downstream.end();
498         }
499 
500         @Override
accept(int t)501         public void accept(int t) {
502             b.accept(t);
503         }
504     }
505 
506     /**
507      * Abstract {@link Sink} for implementing sort on long streams.
508      */
509     private static abstract class AbstractLongSortingSink extends Sink.ChainedLong<Long> {
510         protected boolean cancellationWasRequested;
511 
AbstractLongSortingSink(Sink<? super Long> downstream)512         AbstractLongSortingSink(Sink<? super Long> downstream) {
513             super(downstream);
514         }
515 
516         @Override
cancellationRequested()517         public final boolean cancellationRequested() {
518             cancellationWasRequested = true;
519             return false;
520         }
521     }
522 
523     /**
524      * {@link Sink} for implementing sort on SIZED long streams.
525      */
526     private static final class SizedLongSortingSink extends AbstractLongSortingSink {
527         private long[] array;
528         private int offset;
529 
SizedLongSortingSink(Sink<? super Long> downstream)530         SizedLongSortingSink(Sink<? super Long> downstream) {
531             super(downstream);
532         }
533 
534         @Override
begin(long size)535         public void begin(long size) {
536             if (size >= Nodes.MAX_ARRAY_SIZE)
537                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
538             array = new long[(int) size];
539         }
540 
541         @Override
end()542         public void end() {
543             Arrays.sort(array, 0, offset);
544             downstream.begin(offset);
545             if (!cancellationWasRequested) {
546                 for (int i = 0; i < offset; i++)
547                     downstream.accept(array[i]);
548             }
549             else {
550                 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
551                     downstream.accept(array[i]);
552             }
553             downstream.end();
554             array = null;
555         }
556 
557         @Override
accept(long t)558         public void accept(long t) {
559             array[offset++] = t;
560         }
561     }
562 
563     /**
564      * {@link Sink} for implementing sort on long streams.
565      */
566     private static final class LongSortingSink extends AbstractLongSortingSink {
567         private SpinedBuffer.OfLong b;
568 
LongSortingSink(Sink<? super Long> sink)569         LongSortingSink(Sink<? super Long> sink) {
570             super(sink);
571         }
572 
573         @Override
begin(long size)574         public void begin(long size) {
575             if (size >= Nodes.MAX_ARRAY_SIZE)
576                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
577             b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong();
578         }
579 
580         @Override
end()581         public void end() {
582             long[] longs = b.asPrimitiveArray();
583             Arrays.sort(longs);
584             downstream.begin(longs.length);
585             if (!cancellationWasRequested) {
586                 for (long aLong : longs)
587                     downstream.accept(aLong);
588             }
589             else {
590                 for (long aLong : longs) {
591                     if (downstream.cancellationRequested()) break;
592                     downstream.accept(aLong);
593                 }
594             }
595             downstream.end();
596         }
597 
598         @Override
accept(long t)599         public void accept(long t) {
600             b.accept(t);
601         }
602     }
603 
604     /**
605      * Abstract {@link Sink} for implementing sort on long streams.
606      */
607     private static abstract class AbstractDoubleSortingSink extends Sink.ChainedDouble<Double> {
608         protected boolean cancellationWasRequested;
609 
AbstractDoubleSortingSink(Sink<? super Double> downstream)610         AbstractDoubleSortingSink(Sink<? super Double> downstream) {
611             super(downstream);
612         }
613 
614         @Override
cancellationRequested()615         public final boolean cancellationRequested() {
616             cancellationWasRequested = true;
617             return false;
618         }
619     }
620 
621     /**
622      * {@link Sink} for implementing sort on SIZED double streams.
623      */
624     private static final class SizedDoubleSortingSink extends AbstractDoubleSortingSink {
625         private double[] array;
626         private int offset;
627 
SizedDoubleSortingSink(Sink<? super Double> downstream)628         SizedDoubleSortingSink(Sink<? super Double> downstream) {
629             super(downstream);
630         }
631 
632         @Override
begin(long size)633         public void begin(long size) {
634             if (size >= Nodes.MAX_ARRAY_SIZE)
635                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
636             array = new double[(int) size];
637         }
638 
639         @Override
end()640         public void end() {
641             Arrays.sort(array, 0, offset);
642             downstream.begin(offset);
643             if (!cancellationWasRequested) {
644                 for (int i = 0; i < offset; i++)
645                     downstream.accept(array[i]);
646             }
647             else {
648                 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
649                     downstream.accept(array[i]);
650             }
651             downstream.end();
652             array = null;
653         }
654 
655         @Override
accept(double t)656         public void accept(double t) {
657             array[offset++] = t;
658         }
659     }
660 
661     /**
662      * {@link Sink} for implementing sort on double streams.
663      */
664     private static final class DoubleSortingSink extends AbstractDoubleSortingSink {
665         private SpinedBuffer.OfDouble b;
666 
DoubleSortingSink(Sink<? super Double> sink)667         DoubleSortingSink(Sink<? super Double> sink) {
668             super(sink);
669         }
670 
671         @Override
begin(long size)672         public void begin(long size) {
673             if (size >= Nodes.MAX_ARRAY_SIZE)
674                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
675             b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble();
676         }
677 
678         @Override
end()679         public void end() {
680             double[] doubles = b.asPrimitiveArray();
681             Arrays.sort(doubles);
682             downstream.begin(doubles.length);
683             if (!cancellationWasRequested) {
684                 for (double aDouble : doubles)
685                     downstream.accept(aDouble);
686             }
687             else {
688                 for (double aDouble : doubles) {
689                     if (downstream.cancellationRequested()) break;
690                     downstream.accept(aDouble);
691                 }
692             }
693             downstream.end();
694         }
695 
696         @Override
accept(double t)697         public void accept(double t) {
698             b.accept(t);
699         }
700     }
701 }
702