1 /*
2  * Copyright (c) 2012, 2018, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 package org.openjdk.tests.java.util.stream;
24 
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.Comparator;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.IntSummaryStatistics;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Optional;
37 import java.util.Set;
38 import java.util.StringJoiner;
39 import java.util.TreeMap;
40 import java.util.concurrent.ConcurrentHashMap;
41 import java.util.concurrent.ConcurrentSkipListMap;
42 import java.util.concurrent.atomic.AtomicInteger;
43 import java.util.function.BiFunction;
44 import java.util.function.BinaryOperator;
45 import java.util.function.Function;
46 import java.util.function.Predicate;
47 import java.util.function.Supplier;
48 import java.util.stream.Collector;
49 import java.util.stream.Collectors;
50 import org.openjdk.testlib.java.util.stream.LambdaTestHelpers;
51 import org.openjdk.testlib.java.util.stream.OpTestCase;
52 import java.util.stream.Stream;
53 import org.openjdk.testlib.java.util.stream.StreamOpFlagTestHelper;
54 import org.openjdk.testlib.java.util.stream.StreamTestDataProvider;
55 import org.openjdk.testlib.java.util.stream.TestData;
56 
57 import org.testng.annotations.Test;
58 
59 import static java.util.stream.Collectors.collectingAndThen;
60 import static java.util.stream.Collectors.flatMapping;
61 import static java.util.stream.Collectors.filtering;
62 import static java.util.stream.Collectors.groupingBy;
63 import static java.util.stream.Collectors.groupingByConcurrent;
64 import static java.util.stream.Collectors.mapping;
65 import static java.util.stream.Collectors.partitioningBy;
66 import static java.util.stream.Collectors.reducing;
67 import static java.util.stream.Collectors.toCollection;
68 import static java.util.stream.Collectors.toConcurrentMap;
69 import static java.util.stream.Collectors.toList;
70 import static java.util.stream.Collectors.toMap;
71 import static java.util.stream.Collectors.toSet;
72 import static org.openjdk.testlib.java.util.stream.LambdaTestHelpers.assertContents;
73 import static org.openjdk.testlib.java.util.stream.LambdaTestHelpers.assertContentsUnordered;
74 import static org.openjdk.testlib.java.util.stream.LambdaTestHelpers.mDoubler;
75 
76 /*
77  * @test
78  * @bug 8071600 8144675
79  * @summary Test for collectors.
80  */
81 public class CollectorsTest extends OpTestCase {
82 
83     private abstract static class CollectorAssertion<T, U> {
assertValue(U value, Supplier<Stream<T>> source, boolean ordered)84         abstract void assertValue(U value,
85                                   Supplier<Stream<T>> source,
86                                   boolean ordered) throws ReflectiveOperationException;
87     }
88 
89     static class MappingAssertion<T, V, R> extends CollectorAssertion<T, R> {
90         private final Function<T, V> mapper;
91         private final CollectorAssertion<V, R> downstream;
92 
MappingAssertion(Function<T, V> mapper, CollectorAssertion<V, R> downstream)93         MappingAssertion(Function<T, V> mapper, CollectorAssertion<V, R> downstream) {
94             this.mapper = mapper;
95             this.downstream = downstream;
96         }
97 
98         @Override
assertValue(R value, Supplier<Stream<T>> source, boolean ordered)99         void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
100             downstream.assertValue(value,
101                                    () -> source.get().map(mapper),
102                                    ordered);
103         }
104     }
105 
106     static class FlatMappingAssertion<T, V, R> extends CollectorAssertion<T, R> {
107         private final Function<T, Stream<V>> mapper;
108         private final CollectorAssertion<V, R> downstream;
109 
FlatMappingAssertion(Function<T, Stream<V>> mapper, CollectorAssertion<V, R> downstream)110         FlatMappingAssertion(Function<T, Stream<V>> mapper,
111                              CollectorAssertion<V, R> downstream) {
112             this.mapper = mapper;
113             this.downstream = downstream;
114         }
115 
116         @Override
assertValue(R value, Supplier<Stream<T>> source, boolean ordered)117         void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
118             downstream.assertValue(value,
119                                    () -> source.get().flatMap(mapper),
120                                    ordered);
121         }
122     }
123 
124     static class FilteringAssertion<T, R> extends CollectorAssertion<T, R> {
125         private final Predicate<T> filter;
126         private final CollectorAssertion<T, R> downstream;
127 
FilteringAssertion(Predicate<T> filter, CollectorAssertion<T, R> downstream)128         public FilteringAssertion(Predicate<T> filter, CollectorAssertion<T, R> downstream) {
129             this.filter = filter;
130             this.downstream = downstream;
131         }
132 
133         @Override
assertValue(R value, Supplier<Stream<T>> source, boolean ordered)134         void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
135             downstream.assertValue(value,
136                                    () -> source.get().filter(filter),
137                                    ordered);
138         }
139     }
140 
141     static class GroupingByAssertion<T, K, V, M extends Map<K, ? extends V>> extends CollectorAssertion<T, M> {
142         private final Class<? extends Map> clazz;
143         private final Function<T, K> classifier;
144         private final CollectorAssertion<T,V> downstream;
145 
GroupingByAssertion(Function<T, K> classifier, Class<? extends Map> clazz, CollectorAssertion<T, V> downstream)146         GroupingByAssertion(Function<T, K> classifier, Class<? extends Map> clazz,
147                             CollectorAssertion<T, V> downstream) {
148             this.clazz = clazz;
149             this.classifier = classifier;
150             this.downstream = downstream;
151         }
152 
153         @Override
assertValue(M map, Supplier<Stream<T>> source, boolean ordered)154         void assertValue(M map,
155                          Supplier<Stream<T>> source,
156                          boolean ordered) throws ReflectiveOperationException {
157             if (!clazz.isAssignableFrom(map.getClass()))
158                 fail(String.format("Class mismatch in GroupingByAssertion: %s, %s", clazz, map.getClass()));
159             assertContentsUnordered(map.keySet(), source.get().map(classifier).collect(toSet()));
160             for (Map.Entry<K, ? extends V> entry : map.entrySet()) {
161                 K key = entry.getKey();
162                 downstream.assertValue(entry.getValue(),
163                                        () -> source.get().filter(e -> classifier.apply(e).equals(key)),
164                                        ordered);
165             }
166         }
167     }
168 
169     static class ToMapAssertion<T, K, V, M extends Map<K,V>> extends CollectorAssertion<T, M> {
170         private final Class<? extends Map> clazz;
171         private final Function<T, K> keyFn;
172         private final Function<T, V> valueFn;
173         private final BinaryOperator<V> mergeFn;
174 
ToMapAssertion(Function<T, K> keyFn, Function<T, V> valueFn, BinaryOperator<V> mergeFn, Class<? extends Map> clazz)175         ToMapAssertion(Function<T, K> keyFn,
176                        Function<T, V> valueFn,
177                        BinaryOperator<V> mergeFn,
178                        Class<? extends Map> clazz) {
179             this.clazz = clazz;
180             this.keyFn = keyFn;
181             this.valueFn = valueFn;
182             this.mergeFn = mergeFn;
183         }
184 
185         @Override
assertValue(M map, Supplier<Stream<T>> source, boolean ordered)186         void assertValue(M map, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
187             if (!clazz.isAssignableFrom(map.getClass()))
188                 fail(String.format("Class mismatch in ToMapAssertion: %s, %s", clazz, map.getClass()));
189             Set<K> uniqueKeys = source.get().map(keyFn).collect(toSet());
190             assertEquals(uniqueKeys, map.keySet());
191             source.get().forEach(t -> {
192                 K key = keyFn.apply(t);
193                 V v = source.get()
194                             .filter(e -> key.equals(keyFn.apply(e)))
195                             .map(valueFn)
196                             .reduce(mergeFn)
197                             .get();
198                 assertEquals(map.get(key), v);
199             });
200         }
201     }
202 
203     static class PartitioningByAssertion<T, D> extends CollectorAssertion<T, Map<Boolean,D>> {
204         private final Predicate<T> predicate;
205         private final CollectorAssertion<T,D> downstream;
206 
PartitioningByAssertion(Predicate<T> predicate, CollectorAssertion<T, D> downstream)207         PartitioningByAssertion(Predicate<T> predicate, CollectorAssertion<T, D> downstream) {
208             this.predicate = predicate;
209             this.downstream = downstream;
210         }
211 
212         @Override
assertValue(Map<Boolean, D> map, Supplier<Stream<T>> source, boolean ordered)213         void assertValue(Map<Boolean, D> map,
214                          Supplier<Stream<T>> source,
215                          boolean ordered) throws ReflectiveOperationException {
216             if (!Map.class.isAssignableFrom(map.getClass()))
217                 fail(String.format("Class mismatch in PartitioningByAssertion: %s", map.getClass()));
218             assertEquals(2, map.size());
219             downstream.assertValue(map.get(true), () -> source.get().filter(predicate), ordered);
220             downstream.assertValue(map.get(false), () -> source.get().filter(predicate.negate()), ordered);
221         }
222     }
223 
224     static class ToListAssertion<T> extends CollectorAssertion<T, List<T>> {
225         @Override
assertValue(List<T> value, Supplier<Stream<T>> source, boolean ordered)226         void assertValue(List<T> value, Supplier<Stream<T>> source, boolean ordered)
227                 throws ReflectiveOperationException {
228             if (!List.class.isAssignableFrom(value.getClass()))
229                 fail(String.format("Class mismatch in ToListAssertion: %s", value.getClass()));
230             Stream<T> stream = source.get();
231             List<T> result = new ArrayList<>();
232             for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
233                 result.add(it.next());
234             if (StreamOpFlagTestHelper.isStreamOrdered(stream) && ordered)
235                 assertContents(value, result);
236             else
237                 assertContentsUnordered(value, result);
238         }
239     }
240 
241     static class ToCollectionAssertion<T> extends CollectorAssertion<T, Collection<T>> {
242         private final Class<? extends Collection> clazz;
243         private final boolean targetOrdered;
244 
ToCollectionAssertion(Class<? extends Collection> clazz, boolean targetOrdered)245         ToCollectionAssertion(Class<? extends Collection> clazz, boolean targetOrdered) {
246             this.clazz = clazz;
247             this.targetOrdered = targetOrdered;
248         }
249 
250         @Override
assertValue(Collection<T> value, Supplier<Stream<T>> source, boolean ordered)251         void assertValue(Collection<T> value, Supplier<Stream<T>> source, boolean ordered)
252                 throws ReflectiveOperationException {
253             if (!clazz.isAssignableFrom(value.getClass()))
254                 fail(String.format("Class mismatch in ToCollectionAssertion: %s, %s", clazz, value.getClass()));
255             Stream<T> stream = source.get();
256             Collection<T> result = clazz.newInstance();
257             for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
258                 result.add(it.next());
259             if (StreamOpFlagTestHelper.isStreamOrdered(stream) && targetOrdered && ordered)
260                 assertContents(value, result);
261             else
262                 assertContentsUnordered(value, result);
263         }
264     }
265 
266     static class ReducingAssertion<T, U> extends CollectorAssertion<T, U> {
267         private final U identity;
268         private final Function<T, U> mapper;
269         private final BinaryOperator<U> reducer;
270 
ReducingAssertion(U identity, Function<T, U> mapper, BinaryOperator<U> reducer)271         ReducingAssertion(U identity, Function<T, U> mapper, BinaryOperator<U> reducer) {
272             this.identity = identity;
273             this.mapper = mapper;
274             this.reducer = reducer;
275         }
276 
277         @Override
assertValue(U value, Supplier<Stream<T>> source, boolean ordered)278         void assertValue(U value, Supplier<Stream<T>> source, boolean ordered)
279                 throws ReflectiveOperationException {
280             Optional<U> reduced = source.get().map(mapper).reduce(reducer);
281             if (value == null)
282                 assertTrue(!reduced.isPresent());
283             else if (!reduced.isPresent()) {
284                 assertEquals(value, identity);
285             }
286             else {
287                 assertEquals(value, reduced.get());
288             }
289         }
290     }
291 
292     static class TeeingAssertion<T, R1, R2, RR> extends CollectorAssertion<T, RR> {
293         private final Collector<T, ?, R1> c1;
294         private final Collector<T, ?, R2> c2;
295         private final BiFunction<? super R1, ? super R2, ? extends RR> finisher;
296 
TeeingAssertion(Collector<T, ?, R1> c1, Collector<T, ?, R2> c2, BiFunction<? super R1, ? super R2, ? extends RR> finisher)297         TeeingAssertion(Collector<T, ?, R1> c1, Collector<T, ?, R2> c2,
298                                BiFunction<? super R1, ? super R2, ? extends RR> finisher) {
299             this.c1 = c1;
300             this.c2 = c2;
301             this.finisher = finisher;
302         }
303 
304         @Override
assertValue(RR value, Supplier<Stream<T>> source, boolean ordered)305         void assertValue(RR value, Supplier<Stream<T>> source, boolean ordered) {
306             R1 r1 = source.get().collect(c1);
307             R2 r2 = source.get().collect(c2);
308             RR expected = finisher.apply(r1, r2);
309             assertEquals(value, expected);
310         }
311     }
312 
mapTabulationAsserter(boolean ordered)313     private <T> ResultAsserter<T> mapTabulationAsserter(boolean ordered) {
314         return (act, exp, ord, par) -> {
315             if (par && (!ordered || !ord)) {
316                 CollectorsTest.nestedMapEqualityAssertion(act, exp);
317             }
318             else {
319                 LambdaTestHelpers.assertContentsEqual(act, exp);
320             }
321         };
322     }
323 
324     private<T, M extends Map>
325     void exerciseMapCollection(TestData<T, Stream<T>> data,
326                                Collector<T, ?, ? extends M> collector,
327                                CollectorAssertion<T, M> assertion)
328             throws ReflectiveOperationException {
329         boolean ordered = !collector.characteristics().contains(Collector.Characteristics.UNORDERED);
330 
331         M m = withData(data)
332                 .terminal(s -> s.collect(collector))
333                 .resultAsserter(mapTabulationAsserter(ordered))
334                 .exercise();
335         assertion.assertValue(m, () -> data.stream(), ordered);
336 
337         m = withData(data)
338                 .terminal(s -> s.unordered().collect(collector))
339                 .resultAsserter(mapTabulationAsserter(ordered))
340                 .exercise();
341         assertion.assertValue(m, () -> data.stream(), false);
342     }
343 
344     private static void nestedMapEqualityAssertion(Object o1, Object o2) {
345         if (o1 instanceof Map) {
346             Map m1 = (Map) o1;
347             Map m2 = (Map) o2;
348             assertContentsUnordered(m1.keySet(), m2.keySet());
349             for (Object k : m1.keySet())
350                 nestedMapEqualityAssertion(m1.get(k), m2.get(k));
351         }
352         else if (o1 instanceof Collection) {
353             assertContentsUnordered(((Collection) o1), ((Collection) o2));
354         }
355         else
356             assertEquals(o1, o2);
357     }
358 
359     private<T, R> void assertCollect(TestData.OfRef<T> data,
360                                      Collector<T, ?, R> collector,
361                                      Function<Stream<T>, R> streamReduction) {
362         R check = streamReduction.apply(data.stream());
363         withData(data).terminal(s -> s.collect(collector)).expectedResult(check).exercise();
364     }
365 
366     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
367     public void testReducing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
368         assertCollect(data, Collectors.reducing(0, Integer::sum),
369                       s -> s.reduce(0, Integer::sum));
370         assertCollect(data, Collectors.reducing(Integer.MAX_VALUE, Integer::min),
371                       s -> s.min(Integer::compare).orElse(Integer.MAX_VALUE));
372         assertCollect(data, Collectors.reducing(Integer.MIN_VALUE, Integer::max),
373                       s -> s.max(Integer::compare).orElse(Integer.MIN_VALUE));
374 
375         assertCollect(data, Collectors.reducing(Integer::sum),
376                       s -> s.reduce(Integer::sum));
377         assertCollect(data, Collectors.minBy(Comparator.naturalOrder()),
378                       s -> s.min(Integer::compare));
379         assertCollect(data, Collectors.maxBy(Comparator.naturalOrder()),
380                       s -> s.max(Integer::compare));
381 
382         assertCollect(data, Collectors.reducing(0, x -> x*2, Integer::sum),
383                       s -> s.map(x -> x*2).reduce(0, Integer::sum));
384 
385         assertCollect(data, Collectors.summingLong(x -> x * 2L),
386                       s -> s.map(x -> x*2L).reduce(0L, Long::sum));
387         assertCollect(data, Collectors.summingInt(x -> x * 2),
388                       s -> s.map(x -> x*2).reduce(0, Integer::sum));
389         assertCollect(data, Collectors.summingDouble(x -> x * 2.0d),
390                       s -> s.map(x -> x * 2.0d).reduce(0.0d, Double::sum));
391 
392         assertCollect(data, Collectors.averagingInt(x -> x * 2),
393                       s -> s.mapToInt(x -> x * 2).average().orElse(0));
394         assertCollect(data, Collectors.averagingLong(x -> x * 2),
395                       s -> s.mapToLong(x -> x * 2).average().orElse(0));
396         assertCollect(data, Collectors.averagingDouble(x -> x * 2),
397                       s -> s.mapToDouble(x -> x * 2).average().orElse(0));
398 
399         // Test explicit Collector.of
400         Collector<Integer, long[], Double> avg2xint = Collector.of(() -> new long[2],
401                                                                    (a, b) -> {
402                                                                        a[0] += b * 2;
403                                                                        a[1]++;
404                                                                    },
405                                                                    (a, b) -> {
406                                                                        a[0] += b[0];
407                                                                        a[1] += b[1];
408                                                                        return a;
409                                                                    },
410                                                                    a -> a[1] == 0 ? 0.0d : (double) a[0] / a[1]);
411         assertCollect(data, avg2xint,
412                       s -> s.mapToInt(x -> x * 2).average().orElse(0));
413     }
414 
415     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
416     public void testJoining(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
417         withData(data)
418                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining()))
419                 .expectedResult(join(data, ""))
420                 .exercise();
421 
422         Collector<String, StringBuilder, String> likeJoining = Collector.of(StringBuilder::new, StringBuilder::append, (sb1, sb2) -> sb1.append(sb2.toString()), StringBuilder::toString);
423         withData(data)
424                 .terminal(s -> s.map(Object::toString).collect(likeJoining))
425                 .expectedResult(join(data, ""))
426                 .exercise();
427 
428         withData(data)
429                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",")))
430                 .expectedResult(join(data, ","))
431                 .exercise();
432 
433         withData(data)
434                 .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",", "[", "]")))
435                 .expectedResult("[" + join(data, ",") + "]")
436                 .exercise();
437 
438         withData(data)
439                 .terminal(s -> s.map(Object::toString)
440                                 .collect(StringBuilder::new, StringBuilder::append, StringBuilder::append)
441                                 .toString())
442                 .expectedResult(join(data, ""))
443                 .exercise();
444 
445         withData(data)
446                 .terminal(s -> s.map(Object::toString)
447                                 .collect(() -> new StringJoiner(","),
448                                          (sj, cs) -> sj.add(cs),
449                                          (j1, j2) -> j1.merge(j2))
450                                 .toString())
451                 .expectedResult(join(data, ","))
452                 .exercise();
453 
454         withData(data)
455                 .terminal(s -> s.map(Object::toString)
456                                 .collect(() -> new StringJoiner(",", "[", "]"),
457                                          (sj, cs) -> sj.add(cs),
458                                          (j1, j2) -> j1.merge(j2))
459                                 .toString())
460                 .expectedResult("[" + join(data, ",") + "]")
461                 .exercise();
462     }
463 
464     private<T> String join(TestData.OfRef<T> data, String delim) {
465         StringBuilder sb = new StringBuilder();
466         boolean first = true;
467         for (T i : data) {
468             if (!first)
469                 sb.append(delim);
470             sb.append(i.toString());
471             first = false;
472         }
473         return sb.toString();
474     }
475 
476     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
477     public void testSimpleToMap(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
478         Function<Integer, Integer> keyFn = i -> i * 2;
479         Function<Integer, Integer> valueFn = i -> i * 4;
480 
481         List<Integer> dataAsList = Arrays.asList(data.stream().toArray(Integer[]::new));
482         Set<Integer> dataAsSet = new HashSet<>(dataAsList);
483 
484         BinaryOperator<Integer> sum = Integer::sum;
485         for (BinaryOperator<Integer> op : Arrays.asList((u, v) -> u,
486                                                         (u, v) -> v,
487                                                         sum)) {
488             try {
489                 exerciseMapCollection(data, toMap(keyFn, valueFn),
490                                       new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
491                 if (dataAsList.size() != dataAsSet.size())
492                     fail("Expected ISE on input with duplicates");
493             }
494             catch (IllegalStateException e) {
495                 if (dataAsList.size() == dataAsSet.size())
496                     fail("Expected no ISE on input without duplicates");
497             }
498 
499             exerciseMapCollection(data, toMap(keyFn, valueFn, op),
500                                   new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
501 
502             exerciseMapCollection(data, toMap(keyFn, valueFn, op, TreeMap::new),
503                                   new ToMapAssertion<>(keyFn, valueFn, op, TreeMap.class));
504         }
505 
506         // For concurrent maps, only use commutative merge functions
507         try {
508             exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn),
509                                   new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
510             if (dataAsList.size() != dataAsSet.size())
511                 fail("Expected ISE on input with duplicates");
512         }
513         catch (IllegalStateException e) {
514             if (dataAsList.size() == dataAsSet.size())
515                 fail("Expected no ISE on input without duplicates");
516         }
517 
518         exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn, sum),
519                               new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
520 
521         exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn, sum, ConcurrentSkipListMap::new),
522                               new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentSkipListMap.class));
523     }
524 
525     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
526     public void testSimpleGroupingBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
527         Function<Integer, Integer> classifier = i -> i % 3;
528 
529         // Single-level groupBy
530         exerciseMapCollection(data, groupingBy(classifier),
531                               new GroupingByAssertion<>(classifier, HashMap.class,
532                                                         new ToListAssertion<>()));
533         exerciseMapCollection(data, groupingByConcurrent(classifier),
534                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
535                                                         new ToListAssertion<>()));
536 
537         // With explicit constructors
538         exerciseMapCollection(data,
539                               groupingBy(classifier, TreeMap::new, toCollection(HashSet::new)),
540                               new GroupingByAssertion<>(classifier, TreeMap.class,
541                                                         new ToCollectionAssertion<Integer>(HashSet.class, false)));
542         exerciseMapCollection(data,
543                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new,
544                                                    toCollection(HashSet::new)),
545                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
546                                                         new ToCollectionAssertion<Integer>(HashSet.class, false)));
547     }
548 
549     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
550     public void testGroupingByWithMapping(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
551         Function<Integer, Integer> classifier = i -> i % 3;
552         Function<Integer, Integer> mapper = i -> i * 2;
553 
554         exerciseMapCollection(data,
555                               groupingBy(classifier, mapping(mapper, toList())),
556                               new GroupingByAssertion<>(classifier, HashMap.class,
557                                                         new MappingAssertion<>(mapper,
558                                                                                new ToListAssertion<>())));
559     }
560 
561     @Test(groups = { "serialization-hostile" })
562     public void testFlatMappingClose() {
563         Function<Integer, Integer> classifier = i -> i;
564         AtomicInteger ai = new AtomicInteger();
565         Function<Integer, Stream<Integer>> flatMapper = i -> Stream.of(i, i).onClose(ai::getAndIncrement);
566         Map<Integer, List<Integer>> m = Stream.of(1, 2).collect(groupingBy(classifier, flatMapping(flatMapper, toList())));
567         assertEquals(m.size(), ai.get());
568     }
569 
570     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
571     public void testGroupingByWithFlatMapping(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
572         Function<Integer, Integer> classifier = i -> i % 3;
573         Function<Integer, Stream<Integer>> flatMapperByNull = i -> null;
574         Function<Integer, Stream<Integer>> flatMapperBy0 = i -> Stream.empty();
575         Function<Integer, Stream<Integer>> flatMapperBy2 = i -> Stream.of(i, i);
576 
577         exerciseMapCollection(data,
578                               groupingBy(classifier, flatMapping(flatMapperByNull, toList())),
579                               new GroupingByAssertion<>(classifier, HashMap.class,
580                                                         new FlatMappingAssertion<>(flatMapperBy0,
581                                                                                    new ToListAssertion<>())));
582         exerciseMapCollection(data,
583                               groupingBy(classifier, flatMapping(flatMapperBy0, toList())),
584                               new GroupingByAssertion<>(classifier, HashMap.class,
585                                                         new FlatMappingAssertion<>(flatMapperBy0,
586                                                                                    new ToListAssertion<>())));
587         exerciseMapCollection(data,
588                               groupingBy(classifier, flatMapping(flatMapperBy2, toList())),
589                               new GroupingByAssertion<>(classifier, HashMap.class,
590                                                         new FlatMappingAssertion<>(flatMapperBy2,
591                                                                                    new ToListAssertion<>())));
592     }
593 
594     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
595     public void testGroupingByWithFiltering(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
596         Function<Integer, Integer> classifier = i -> i % 3;
597         Predicate<Integer> filteringByMod2 = i -> i % 2 == 0;
598         Predicate<Integer> filteringByUnder100 = i -> i % 2 < 100;
599         Predicate<Integer> filteringByTrue = i -> true;
600         Predicate<Integer> filteringByFalse = i -> false;
601 
602         exerciseMapCollection(data,
603                               groupingBy(classifier, filtering(filteringByMod2, toList())),
604                               new GroupingByAssertion<>(classifier, HashMap.class,
605                                                         new FilteringAssertion<>(filteringByMod2,
606                                                                                    new ToListAssertion<>())));
607         exerciseMapCollection(data,
608                               groupingBy(classifier, filtering(filteringByUnder100, toList())),
609                               new GroupingByAssertion<>(classifier, HashMap.class,
610                                                         new FilteringAssertion<>(filteringByUnder100,
611                                                                                    new ToListAssertion<>())));
612         exerciseMapCollection(data,
613                               groupingBy(classifier, filtering(filteringByTrue, toList())),
614                               new GroupingByAssertion<>(classifier, HashMap.class,
615                                                         new FilteringAssertion<>(filteringByTrue,
616                                                                                    new ToListAssertion<>())));
617         exerciseMapCollection(data,
618                               groupingBy(classifier, filtering(filteringByFalse, toList())),
619                               new GroupingByAssertion<>(classifier, HashMap.class,
620                                                         new FilteringAssertion<>(filteringByFalse,
621                                                                                    new ToListAssertion<>())));
622     }
623 
624     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
625     public void testTwoLevelGroupingBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
626         Function<Integer, Integer> classifier = i -> i % 6;
627         Function<Integer, Integer> classifier2 = i -> i % 23;
628 
629         // Two-level groupBy
630         exerciseMapCollection(data,
631                               groupingBy(classifier, groupingBy(classifier2)),
632                               new GroupingByAssertion<>(classifier, HashMap.class,
633                                                         new GroupingByAssertion<>(classifier2, HashMap.class,
634                                                                                   new ToListAssertion<>())));
635         // with concurrent as upstream
636         exerciseMapCollection(data,
637                               groupingByConcurrent(classifier, groupingBy(classifier2)),
638                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
639                                                         new GroupingByAssertion<>(classifier2, HashMap.class,
640                                                                                   new ToListAssertion<>())));
641         // with concurrent as downstream
642         exerciseMapCollection(data,
643                               groupingBy(classifier, groupingByConcurrent(classifier2)),
644                               new GroupingByAssertion<>(classifier, HashMap.class,
645                                                         new GroupingByAssertion<>(classifier2, ConcurrentHashMap.class,
646                                                                                   new ToListAssertion<>())));
647         // with concurrent as upstream and downstream
648         exerciseMapCollection(data,
649                               groupingByConcurrent(classifier, groupingByConcurrent(classifier2)),
650                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
651                                                         new GroupingByAssertion<>(classifier2, ConcurrentHashMap.class,
652                                                                                   new ToListAssertion<>())));
653 
654         // With explicit constructors
655         exerciseMapCollection(data,
656                               groupingBy(classifier, TreeMap::new, groupingBy(classifier2, TreeMap::new, toCollection(HashSet::new))),
657                               new GroupingByAssertion<>(classifier, TreeMap.class,
658                                                         new GroupingByAssertion<>(classifier2, TreeMap.class,
659                                                                                   new ToCollectionAssertion<Integer>(HashSet.class, false))));
660         // with concurrent as upstream
661         exerciseMapCollection(data,
662                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingBy(classifier2, TreeMap::new, toList())),
663                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
664                                                         new GroupingByAssertion<>(classifier2, TreeMap.class,
665                                                                                   new ToListAssertion<>())));
666         // with concurrent as downstream
667         exerciseMapCollection(data,
668                               groupingBy(classifier, TreeMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
669                               new GroupingByAssertion<>(classifier, TreeMap.class,
670                                                         new GroupingByAssertion<>(classifier2, ConcurrentSkipListMap.class,
671                                                                                   new ToListAssertion<>())));
672         // with concurrent as upstream and downstream
673         exerciseMapCollection(data,
674                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
675                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
676                                                         new GroupingByAssertion<>(classifier2, ConcurrentSkipListMap.class,
677                                                                                   new ToListAssertion<>())));
678     }
679 
680     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
681     public void testGroupubgByWithReducing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
682         Function<Integer, Integer> classifier = i -> i % 3;
683 
684         // Single-level simple reduce
685         exerciseMapCollection(data,
686                               groupingBy(classifier, reducing(0, Integer::sum)),
687                               new GroupingByAssertion<>(classifier, HashMap.class,
688                                                         new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
689         // with concurrent
690         exerciseMapCollection(data,
691                               groupingByConcurrent(classifier, reducing(0, Integer::sum)),
692                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
693                                                         new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
694 
695         // With explicit constructors
696         exerciseMapCollection(data,
697                               groupingBy(classifier, TreeMap::new, reducing(0, Integer::sum)),
698                               new GroupingByAssertion<>(classifier, TreeMap.class,
699                                                         new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
700         // with concurrent
701         exerciseMapCollection(data,
702                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, Integer::sum)),
703                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
704                                                         new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
705 
706         // Single-level map-reduce
707         exerciseMapCollection(data,
708                               groupingBy(classifier, reducing(0, mDoubler, Integer::sum)),
709                               new GroupingByAssertion<>(classifier, HashMap.class,
710                                                         new ReducingAssertion<>(0, mDoubler, Integer::sum)));
711         // with concurrent
712         exerciseMapCollection(data,
713                               groupingByConcurrent(classifier, reducing(0, mDoubler, Integer::sum)),
714                               new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
715                                                         new ReducingAssertion<>(0, mDoubler, Integer::sum)));
716 
717         // With explicit constructors
718         exerciseMapCollection(data,
719                               groupingBy(classifier, TreeMap::new, reducing(0, mDoubler, Integer::sum)),
720                               new GroupingByAssertion<>(classifier, TreeMap.class,
721                                                         new ReducingAssertion<>(0, mDoubler, Integer::sum)));
722         // with concurrent
723         exerciseMapCollection(data,
724                               groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, mDoubler, Integer::sum)),
725                               new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
726                                                         new ReducingAssertion<>(0, mDoubler, Integer::sum)));
727     }
728 
729     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
730     public void testSimplePartitioningBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
731         Predicate<Integer> classifier = i -> i % 3 == 0;
732 
733         // Single-level partition to downstream List
734         exerciseMapCollection(data,
735                               partitioningBy(classifier),
736                               new PartitioningByAssertion<>(classifier, new ToListAssertion<>()));
737         exerciseMapCollection(data,
738                               partitioningBy(classifier, toList()),
739                               new PartitioningByAssertion<>(classifier, new ToListAssertion<>()));
740     }
741 
742     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
743     public void testTwoLevelPartitioningBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
744         Predicate<Integer> classifier = i -> i % 3 == 0;
745         Predicate<Integer> classifier2 = i -> i % 7 == 0;
746 
747         // Two level partition
748         exerciseMapCollection(data,
749                               partitioningBy(classifier, partitioningBy(classifier2)),
750                               new PartitioningByAssertion<>(classifier,
751                                                             new PartitioningByAssertion(classifier2, new ToListAssertion<>())));
752 
753         // Two level partition with reduce
754         exerciseMapCollection(data,
755                               partitioningBy(classifier, reducing(0, Integer::sum)),
756                               new PartitioningByAssertion<>(classifier,
757                                                             new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
758     }
759 
760     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
761     public void testComposeFinisher(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
762         List<Integer> asList = exerciseTerminalOps(data, s -> s.collect(toList()));
763         List<Integer> asImmutableList = exerciseTerminalOps(data, s -> s.collect(collectingAndThen(toList(), Collections::unmodifiableList)));
764         assertEquals(asList, asImmutableList);
765         try {
766             asImmutableList.add(0);
767             fail("Expecting immutable result");
768         }
769         catch (UnsupportedOperationException ignored) { }
770     }
771 
772     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
773     public void testTeeing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
774         Collector<Integer, ?, Long> summing = Collectors.summingLong(Integer::valueOf);
775         Collector<Integer, ?, Long> counting = Collectors.counting();
776         Collector<Integer, ?, Integer> min = collectingAndThen(Collectors.<Integer>minBy(Comparator.naturalOrder()),
777                 opt -> opt.orElse(Integer.MAX_VALUE));
778         Collector<Integer, ?, Integer> max = collectingAndThen(Collectors.<Integer>maxBy(Comparator.naturalOrder()),
779                 opt -> opt.orElse(Integer.MIN_VALUE));
780         Collector<Integer, ?, String> joining = mapping(String::valueOf, Collectors.joining(", ", "[", "]"));
781 
782         Collector<Integer, ?, Map.Entry<Long, Long>> sumAndCount = Collectors.teeing(summing, counting, Map::entry);
783         Collector<Integer, ?, Map.Entry<Integer, Integer>> minAndMax = Collectors.teeing(min, max, Map::entry);
784         Collector<Integer, ?, Double> averaging = Collectors.teeing(summing, counting,
785                 (sum, count) -> ((double)sum) / count);
786         Collector<Integer, ?, String> summaryStatistics = Collectors.teeing(sumAndCount, minAndMax,
787                 (sumCountEntry, minMaxEntry) -> new IntSummaryStatistics(
788                         sumCountEntry.getValue(), minMaxEntry.getKey(),
789                         minMaxEntry.getValue(), sumCountEntry.getKey()).toString());
790         Collector<Integer, ?, String> countAndContent = Collectors.teeing(counting, joining,
791                 (count, content) -> count+": "+content);
792 
793         assertCollect(data, sumAndCount, stream -> {
794             List<Integer> list = stream.collect(toList());
795             return Map.entry(list.stream().mapToLong(Integer::intValue).sum(), (long) list.size());
796         });
797         assertCollect(data, averaging, stream -> stream.mapToInt(Integer::intValue).average().orElse(Double.NaN));
798         assertCollect(data, summaryStatistics,
799                 stream -> stream.mapToInt(Integer::intValue).summaryStatistics().toString());
800         assertCollect(data, countAndContent, stream -> {
801             List<Integer> list = stream.collect(toList());
802             return list.size()+": "+list;
803         });
804 
805         Function<Integer, Integer> classifier = i -> i % 3;
806         exerciseMapCollection(data, groupingBy(classifier, sumAndCount),
807                 new GroupingByAssertion<>(classifier, Map.class,
808                         new TeeingAssertion<>(summing, counting, Map::entry)));
809     }
810 }
811