1/*
2 * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23package org.openjdk.tests.java.util.stream;
24
25import java.util.ArrayList;
26import java.util.Arrays;
27import java.util.Collection;
28import java.util.Collections;
29import java.util.Comparator;
30import java.util.HashMap;
31import java.util.HashSet;
32import java.util.Iterator;
33import java.util.List;
34import java.util.Map;
35import java.util.Optional;
36import java.util.Set;
37import java.util.StringJoiner;
38import java.util.TreeMap;
39import java.util.concurrent.ConcurrentHashMap;
40import java.util.concurrent.ConcurrentSkipListMap;
41import java.util.concurrent.atomic.AtomicInteger;
42import java.util.function.BinaryOperator;
43import java.util.function.Function;
44import java.util.function.Predicate;
45import java.util.function.Supplier;
46import java.util.stream.Collector;
47import java.util.stream.Collectors;
48import java.util.stream.LambdaTestHelpers;
49import java.util.stream.OpTestCase;
50import java.util.stream.Stream;
51import java.util.stream.StreamOpFlagTestHelper;
52import java.util.stream.StreamTestDataProvider;
53import java.util.stream.TestData;
54
55import org.testng.annotations.Test;
56
57import static java.util.stream.Collectors.collectingAndThen;
58import static java.util.stream.Collectors.flatMapping;
59import static java.util.stream.Collectors.filtering;
60import static java.util.stream.Collectors.groupingBy;
61import static java.util.stream.Collectors.groupingByConcurrent;
62import static java.util.stream.Collectors.mapping;
63import static java.util.stream.Collectors.partitioningBy;
64import static java.util.stream.Collectors.reducing;
65import static java.util.stream.Collectors.toCollection;
66import static java.util.stream.Collectors.toConcurrentMap;
67import static java.util.stream.Collectors.toList;
68import static java.util.stream.Collectors.toMap;
69import static java.util.stream.Collectors.toSet;
70import static java.util.stream.LambdaTestHelpers.assertContents;
71import static java.util.stream.LambdaTestHelpers.assertContentsUnordered;
72import static java.util.stream.LambdaTestHelpers.mDoubler;
73
74/*
75 * @test
76 * @bug 8071600 8144675
77 * @summary Test for collectors.
78 */
79public class CollectorsTest extends OpTestCase {
80
81    private abstract static class CollectorAssertion<T, U> {
82        abstract void assertValue(U value,
83                                  Supplier<Stream<T>> source,
84                                  boolean ordered) throws ReflectiveOperationException;
85    }
86
87    static class MappingAssertion<T, V, R> extends CollectorAssertion<T, R> {
88        private final Function<T, V> mapper;
89        private final CollectorAssertion<V, R> downstream;
90
91        MappingAssertion(Function<T, V> mapper, CollectorAssertion<V, R> downstream) {
92            this.mapper = mapper;
93            this.downstream = downstream;
94        }
95
96        @Override
97        void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
98            downstream.assertValue(value,
99                                   () -> source.get().map(mapper::apply),
100                                   ordered);
101        }
102    }
103
104    static class FlatMappingAssertion<T, V, R> extends CollectorAssertion<T, R> {
105        private final Function<T, Stream<V>> mapper;
106        private final CollectorAssertion<V, R> downstream;
107
108        FlatMappingAssertion(Function<T, Stream<V>> mapper,
109                             CollectorAssertion<V, R> downstream) {
110            this.mapper = mapper;
111            this.downstream = downstream;
112        }
113
114        @Override
115        void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
116            downstream.assertValue(value,
117                                   () -> source.get().flatMap(mapper::apply),
118                                   ordered);
119        }
120    }
121
122    static class FilteringAssertion<T, R> extends CollectorAssertion<T, R> {
123        private final Predicate<T> filter;
124        private final CollectorAssertion<T, R> downstream;
125
126        public FilteringAssertion(Predicate<T> filter, CollectorAssertion<T, R> downstream) {
127            this.filter = filter;
128            this.downstream = downstream;
129        }
130
131        @Override
132        void assertValue(R value, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
133            downstream.assertValue(value,
134                                   () -> source.get().filter(filter),
135                                   ordered);
136        }
137    }
138
139    static class GroupingByAssertion<T, K, V, M extends Map<K, ? extends V>> extends CollectorAssertion<T, M> {
140        private final Class<? extends Map> clazz;
141        private final Function<T, K> classifier;
142        private final CollectorAssertion<T,V> downstream;
143
144        GroupingByAssertion(Function<T, K> classifier, Class<? extends Map> clazz,
145                            CollectorAssertion<T, V> downstream) {
146            this.clazz = clazz;
147            this.classifier = classifier;
148            this.downstream = downstream;
149        }
150
151        @Override
152        void assertValue(M map,
153                         Supplier<Stream<T>> source,
154                         boolean ordered) throws ReflectiveOperationException {
155            if (!clazz.isAssignableFrom(map.getClass()))
156                fail(String.format("Class mismatch in GroupingByAssertion: %s, %s", clazz, map.getClass()));
157            assertContentsUnordered(map.keySet(), source.get().map(classifier).collect(toSet()));
158            for (Map.Entry<K, ? extends V> entry : map.entrySet()) {
159                K key = entry.getKey();
160                downstream.assertValue(entry.getValue(),
161                                       () -> source.get().filter(e -> classifier.apply(e).equals(key)),
162                                       ordered);
163            }
164        }
165    }
166
167    static class ToMapAssertion<T, K, V, M extends Map<K,V>> extends CollectorAssertion<T, M> {
168        private final Class<? extends Map> clazz;
169        private final Function<T, K> keyFn;
170        private final Function<T, V> valueFn;
171        private final BinaryOperator<V> mergeFn;
172
173        ToMapAssertion(Function<T, K> keyFn,
174                       Function<T, V> valueFn,
175                       BinaryOperator<V> mergeFn,
176                       Class<? extends Map> clazz) {
177            this.clazz = clazz;
178            this.keyFn = keyFn;
179            this.valueFn = valueFn;
180            this.mergeFn = mergeFn;
181        }
182
183        @Override
184        void assertValue(M map, Supplier<Stream<T>> source, boolean ordered) throws ReflectiveOperationException {
185            if (!clazz.isAssignableFrom(map.getClass()))
186                fail(String.format("Class mismatch in ToMapAssertion: %s, %s", clazz, map.getClass()));
187            Set<K> uniqueKeys = source.get().map(keyFn).collect(toSet());
188            assertEquals(uniqueKeys, map.keySet());
189            source.get().forEach(t -> {
190                K key = keyFn.apply(t);
191                V v = source.get()
192                            .filter(e -> key.equals(keyFn.apply(e)))
193                            .map(valueFn)
194                            .reduce(mergeFn)
195                            .get();
196                assertEquals(map.get(key), v);
197            });
198        }
199    }
200
201    static class PartitioningByAssertion<T, D> extends CollectorAssertion<T, Map<Boolean,D>> {
202        private final Predicate<T> predicate;
203        private final CollectorAssertion<T,D> downstream;
204
205        PartitioningByAssertion(Predicate<T> predicate, CollectorAssertion<T, D> downstream) {
206            this.predicate = predicate;
207            this.downstream = downstream;
208        }
209
210        @Override
211        void assertValue(Map<Boolean, D> map,
212                         Supplier<Stream<T>> source,
213                         boolean ordered) throws ReflectiveOperationException {
214            if (!Map.class.isAssignableFrom(map.getClass()))
215                fail(String.format("Class mismatch in PartitioningByAssertion: %s", map.getClass()));
216            assertEquals(2, map.size());
217            downstream.assertValue(map.get(true), () -> source.get().filter(predicate), ordered);
218            downstream.assertValue(map.get(false), () -> source.get().filter(predicate.negate()), ordered);
219        }
220    }
221
222    static class ToListAssertion<T> extends CollectorAssertion<T, List<T>> {
223        @Override
224        void assertValue(List<T> value, Supplier<Stream<T>> source, boolean ordered)
225                throws ReflectiveOperationException {
226            if (!List.class.isAssignableFrom(value.getClass()))
227                fail(String.format("Class mismatch in ToListAssertion: %s", value.getClass()));
228            Stream<T> stream = source.get();
229            List<T> result = new ArrayList<>();
230            for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
231                result.add(it.next());
232            if (StreamOpFlagTestHelper.isStreamOrdered(stream) && ordered)
233                assertContents(value, result);
234            else
235                assertContentsUnordered(value, result);
236        }
237    }
238
239    static class ToCollectionAssertion<T> extends CollectorAssertion<T, Collection<T>> {
240        private final Class<? extends Collection> clazz;
241        private final boolean targetOrdered;
242
243        ToCollectionAssertion(Class<? extends Collection> clazz, boolean targetOrdered) {
244            this.clazz = clazz;
245            this.targetOrdered = targetOrdered;
246        }
247
248        @Override
249        void assertValue(Collection<T> value, Supplier<Stream<T>> source, boolean ordered)
250                throws ReflectiveOperationException {
251            if (!clazz.isAssignableFrom(value.getClass()))
252                fail(String.format("Class mismatch in ToCollectionAssertion: %s, %s", clazz, value.getClass()));
253            Stream<T> stream = source.get();
254            Collection<T> result = clazz.newInstance();
255            for (Iterator<T> it = stream.iterator(); it.hasNext(); ) // avoid capturing result::add
256                result.add(it.next());
257            if (StreamOpFlagTestHelper.isStreamOrdered(stream) && targetOrdered && ordered)
258                assertContents(value, result);
259            else
260                assertContentsUnordered(value, result);
261        }
262    }
263
264    static class ReducingAssertion<T, U> extends CollectorAssertion<T, U> {
265        private final U identity;
266        private final Function<T, U> mapper;
267        private final BinaryOperator<U> reducer;
268
269        ReducingAssertion(U identity, Function<T, U> mapper, BinaryOperator<U> reducer) {
270            this.identity = identity;
271            this.mapper = mapper;
272            this.reducer = reducer;
273        }
274
275        @Override
276        void assertValue(U value, Supplier<Stream<T>> source, boolean ordered)
277                throws ReflectiveOperationException {
278            Optional<U> reduced = source.get().map(mapper).reduce(reducer);
279            if (value == null)
280                assertTrue(!reduced.isPresent());
281            else if (!reduced.isPresent()) {
282                assertEquals(value, identity);
283            }
284            else {
285                assertEquals(value, reduced.get());
286            }
287        }
288    }
289
290    private <T> ResultAsserter<T> mapTabulationAsserter(boolean ordered) {
291        return (act, exp, ord, par) -> {
292            if (par && (!ordered || !ord)) {
293                CollectorsTest.nestedMapEqualityAssertion(act, exp);
294            }
295            else {
296                LambdaTestHelpers.assertContentsEqual(act, exp);
297            }
298        };
299    }
300
301    private<T, M extends Map>
302    void exerciseMapCollection(TestData<T, Stream<T>> data,
303                               Collector<T, ?, ? extends M> collector,
304                               CollectorAssertion<T, M> assertion)
305            throws ReflectiveOperationException {
306        boolean ordered = !collector.characteristics().contains(Collector.Characteristics.UNORDERED);
307
308        M m = withData(data)
309                .terminal(s -> s.collect(collector))
310                .resultAsserter(mapTabulationAsserter(ordered))
311                .exercise();
312        assertion.assertValue(m, () -> data.stream(), ordered);
313
314        m = withData(data)
315                .terminal(s -> s.unordered().collect(collector))
316                .resultAsserter(mapTabulationAsserter(ordered))
317                .exercise();
318        assertion.assertValue(m, () -> data.stream(), false);
319    }
320
321    private static void nestedMapEqualityAssertion(Object o1, Object o2) {
322        if (o1 instanceof Map) {
323            Map m1 = (Map) o1;
324            Map m2 = (Map) o2;
325            assertContentsUnordered(m1.keySet(), m2.keySet());
326            for (Object k : m1.keySet())
327                nestedMapEqualityAssertion(m1.get(k), m2.get(k));
328        }
329        else if (o1 instanceof Collection) {
330            assertContentsUnordered(((Collection) o1), ((Collection) o2));
331        }
332        else
333            assertEquals(o1, o2);
334    }
335
336    private<T, R> void assertCollect(TestData.OfRef<T> data,
337                                     Collector<T, ?, R> collector,
338                                     Function<Stream<T>, R> streamReduction) {
339        R check = streamReduction.apply(data.stream());
340        withData(data).terminal(s -> s.collect(collector)).expectedResult(check).exercise();
341    }
342
343    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
344    public void testReducing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
345        assertCollect(data, Collectors.reducing(0, Integer::sum),
346                      s -> s.reduce(0, Integer::sum));
347        assertCollect(data, Collectors.reducing(Integer.MAX_VALUE, Integer::min),
348                      s -> s.min(Integer::compare).orElse(Integer.MAX_VALUE));
349        assertCollect(data, Collectors.reducing(Integer.MIN_VALUE, Integer::max),
350                      s -> s.max(Integer::compare).orElse(Integer.MIN_VALUE));
351
352        assertCollect(data, Collectors.reducing(Integer::sum),
353                      s -> s.reduce(Integer::sum));
354        assertCollect(data, Collectors.minBy(Comparator.naturalOrder()),
355                      s -> s.min(Integer::compare));
356        assertCollect(data, Collectors.maxBy(Comparator.naturalOrder()),
357                      s -> s.max(Integer::compare));
358
359        assertCollect(data, Collectors.reducing(0, x -> x*2, Integer::sum),
360                      s -> s.map(x -> x*2).reduce(0, Integer::sum));
361
362        assertCollect(data, Collectors.summingLong(x -> x * 2L),
363                      s -> s.map(x -> x*2L).reduce(0L, Long::sum));
364        assertCollect(data, Collectors.summingInt(x -> x * 2),
365                      s -> s.map(x -> x*2).reduce(0, Integer::sum));
366        assertCollect(data, Collectors.summingDouble(x -> x * 2.0d),
367                      s -> s.map(x -> x * 2.0d).reduce(0.0d, Double::sum));
368
369        assertCollect(data, Collectors.averagingInt(x -> x * 2),
370                      s -> s.mapToInt(x -> x * 2).average().orElse(0));
371        assertCollect(data, Collectors.averagingLong(x -> x * 2),
372                      s -> s.mapToLong(x -> x * 2).average().orElse(0));
373        assertCollect(data, Collectors.averagingDouble(x -> x * 2),
374                      s -> s.mapToDouble(x -> x * 2).average().orElse(0));
375
376        // Test explicit Collector.of
377        Collector<Integer, long[], Double> avg2xint = Collector.of(() -> new long[2],
378                                                                   (a, b) -> {
379                                                                       a[0] += b * 2;
380                                                                       a[1]++;
381                                                                   },
382                                                                   (a, b) -> {
383                                                                       a[0] += b[0];
384                                                                       a[1] += b[1];
385                                                                       return a;
386                                                                   },
387                                                                   a -> a[1] == 0 ? 0.0d : (double) a[0] / a[1]);
388        assertCollect(data, avg2xint,
389                      s -> s.mapToInt(x -> x * 2).average().orElse(0));
390    }
391
392    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
393    public void testJoining(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
394        withData(data)
395                .terminal(s -> s.map(Object::toString).collect(Collectors.joining()))
396                .expectedResult(join(data, ""))
397                .exercise();
398
399        Collector<String, StringBuilder, String> likeJoining = Collector.of(StringBuilder::new, StringBuilder::append, (sb1, sb2) -> sb1.append(sb2.toString()), StringBuilder::toString);
400        withData(data)
401                .terminal(s -> s.map(Object::toString).collect(likeJoining))
402                .expectedResult(join(data, ""))
403                .exercise();
404
405        withData(data)
406                .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",")))
407                .expectedResult(join(data, ","))
408                .exercise();
409
410        withData(data)
411                .terminal(s -> s.map(Object::toString).collect(Collectors.joining(",", "[", "]")))
412                .expectedResult("[" + join(data, ",") + "]")
413                .exercise();
414
415        withData(data)
416                .terminal(s -> s.map(Object::toString)
417                                .collect(StringBuilder::new, StringBuilder::append, StringBuilder::append)
418                                .toString())
419                .expectedResult(join(data, ""))
420                .exercise();
421
422        withData(data)
423                .terminal(s -> s.map(Object::toString)
424                                .collect(() -> new StringJoiner(","),
425                                         (sj, cs) -> sj.add(cs),
426                                         (j1, j2) -> j1.merge(j2))
427                                .toString())
428                .expectedResult(join(data, ","))
429                .exercise();
430
431        withData(data)
432                .terminal(s -> s.map(Object::toString)
433                                .collect(() -> new StringJoiner(",", "[", "]"),
434                                         (sj, cs) -> sj.add(cs),
435                                         (j1, j2) -> j1.merge(j2))
436                                .toString())
437                .expectedResult("[" + join(data, ",") + "]")
438                .exercise();
439    }
440
441    private<T> String join(TestData.OfRef<T> data, String delim) {
442        StringBuilder sb = new StringBuilder();
443        boolean first = true;
444        for (T i : data) {
445            if (!first)
446                sb.append(delim);
447            sb.append(i.toString());
448            first = false;
449        }
450        return sb.toString();
451    }
452
453    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
454    public void testSimpleToMap(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
455        Function<Integer, Integer> keyFn = i -> i * 2;
456        Function<Integer, Integer> valueFn = i -> i * 4;
457
458        List<Integer> dataAsList = Arrays.asList(data.stream().toArray(Integer[]::new));
459        Set<Integer> dataAsSet = new HashSet<>(dataAsList);
460
461        BinaryOperator<Integer> sum = Integer::sum;
462        for (BinaryOperator<Integer> op : Arrays.asList((u, v) -> u,
463                                                        (u, v) -> v,
464                                                        sum)) {
465            try {
466                exerciseMapCollection(data, toMap(keyFn, valueFn),
467                                      new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
468                if (dataAsList.size() != dataAsSet.size())
469                    fail("Expected ISE on input with duplicates");
470            }
471            catch (IllegalStateException e) {
472                if (dataAsList.size() == dataAsSet.size())
473                    fail("Expected no ISE on input without duplicates");
474            }
475
476            exerciseMapCollection(data, toMap(keyFn, valueFn, op),
477                                  new ToMapAssertion<>(keyFn, valueFn, op, HashMap.class));
478
479            exerciseMapCollection(data, toMap(keyFn, valueFn, op, TreeMap::new),
480                                  new ToMapAssertion<>(keyFn, valueFn, op, TreeMap.class));
481        }
482
483        // For concurrent maps, only use commutative merge functions
484        try {
485            exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn),
486                                  new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
487            if (dataAsList.size() != dataAsSet.size())
488                fail("Expected ISE on input with duplicates");
489        }
490        catch (IllegalStateException e) {
491            if (dataAsList.size() == dataAsSet.size())
492                fail("Expected no ISE on input without duplicates");
493        }
494
495        exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn, sum),
496                              new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentHashMap.class));
497
498        exerciseMapCollection(data, toConcurrentMap(keyFn, valueFn, sum, ConcurrentSkipListMap::new),
499                              new ToMapAssertion<>(keyFn, valueFn, sum, ConcurrentSkipListMap.class));
500    }
501
502    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
503    public void testSimpleGroupingBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
504        Function<Integer, Integer> classifier = i -> i % 3;
505
506        // Single-level groupBy
507        exerciseMapCollection(data, groupingBy(classifier),
508                              new GroupingByAssertion<>(classifier, HashMap.class,
509                                                        new ToListAssertion<>()));
510        exerciseMapCollection(data, groupingByConcurrent(classifier),
511                              new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
512                                                        new ToListAssertion<>()));
513
514        // With explicit constructors
515        exerciseMapCollection(data,
516                              groupingBy(classifier, TreeMap::new, toCollection(HashSet::new)),
517                              new GroupingByAssertion<>(classifier, TreeMap.class,
518                                                        new ToCollectionAssertion<Integer>(HashSet.class, false)));
519        exerciseMapCollection(data,
520                              groupingByConcurrent(classifier, ConcurrentSkipListMap::new,
521                                                   toCollection(HashSet::new)),
522                              new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
523                                                        new ToCollectionAssertion<Integer>(HashSet.class, false)));
524    }
525
526    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
527    public void testGroupingByWithMapping(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
528        Function<Integer, Integer> classifier = i -> i % 3;
529        Function<Integer, Integer> mapper = i -> i * 2;
530
531        exerciseMapCollection(data,
532                              groupingBy(classifier, mapping(mapper, toList())),
533                              new GroupingByAssertion<>(classifier, HashMap.class,
534                                                        new MappingAssertion<>(mapper,
535                                                                               new ToListAssertion<>())));
536    }
537
538    @Test(groups = { "serialization-hostile" })
539    public void testFlatMappingClose() {
540        Function<Integer, Integer> classifier = i -> i;
541        AtomicInteger ai = new AtomicInteger();
542        Function<Integer, Stream<Integer>> flatMapper = i -> Stream.of(i, i).onClose(ai::getAndIncrement);
543        Map<Integer, List<Integer>> m = Stream.of(1, 2).collect(groupingBy(classifier, flatMapping(flatMapper, toList())));
544        assertEquals(m.size(), ai.get());
545    }
546
547    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
548    public void testGroupingByWithFlatMapping(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
549        Function<Integer, Integer> classifier = i -> i % 3;
550        Function<Integer, Stream<Integer>> flatMapperByNull = i -> null;
551        Function<Integer, Stream<Integer>> flatMapperBy0 = i -> Stream.empty();
552        Function<Integer, Stream<Integer>> flatMapperBy2 = i -> Stream.of(i, i);
553
554        exerciseMapCollection(data,
555                              groupingBy(classifier, flatMapping(flatMapperByNull, toList())),
556                              new GroupingByAssertion<>(classifier, HashMap.class,
557                                                        new FlatMappingAssertion<>(flatMapperBy0,
558                                                                                   new ToListAssertion<>())));
559        exerciseMapCollection(data,
560                              groupingBy(classifier, flatMapping(flatMapperBy0, toList())),
561                              new GroupingByAssertion<>(classifier, HashMap.class,
562                                                        new FlatMappingAssertion<>(flatMapperBy0,
563                                                                                   new ToListAssertion<>())));
564        exerciseMapCollection(data,
565                              groupingBy(classifier, flatMapping(flatMapperBy2, toList())),
566                              new GroupingByAssertion<>(classifier, HashMap.class,
567                                                        new FlatMappingAssertion<>(flatMapperBy2,
568                                                                                   new ToListAssertion<>())));
569    }
570
571    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
572    public void testGroupingByWithFiltering(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
573        Function<Integer, Integer> classifier = i -> i % 3;
574        Predicate<Integer> filteringByMod2 = i -> i % 2 == 0;
575        Predicate<Integer> filteringByUnder100 = i -> i % 2 < 100;
576        Predicate<Integer> filteringByTrue = i -> true;
577        Predicate<Integer> filteringByFalse = i -> false;
578
579        exerciseMapCollection(data,
580                              groupingBy(classifier, filtering(filteringByMod2, toList())),
581                              new GroupingByAssertion<>(classifier, HashMap.class,
582                                                        new FilteringAssertion<>(filteringByMod2,
583                                                                                   new ToListAssertion<>())));
584        exerciseMapCollection(data,
585                              groupingBy(classifier, filtering(filteringByUnder100, toList())),
586                              new GroupingByAssertion<>(classifier, HashMap.class,
587                                                        new FilteringAssertion<>(filteringByUnder100,
588                                                                                   new ToListAssertion<>())));
589        exerciseMapCollection(data,
590                              groupingBy(classifier, filtering(filteringByTrue, toList())),
591                              new GroupingByAssertion<>(classifier, HashMap.class,
592                                                        new FilteringAssertion<>(filteringByTrue,
593                                                                                   new ToListAssertion<>())));
594        exerciseMapCollection(data,
595                              groupingBy(classifier, filtering(filteringByFalse, toList())),
596                              new GroupingByAssertion<>(classifier, HashMap.class,
597                                                        new FilteringAssertion<>(filteringByFalse,
598                                                                                   new ToListAssertion<>())));
599    }
600
601    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
602    public void testTwoLevelGroupingBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
603        Function<Integer, Integer> classifier = i -> i % 6;
604        Function<Integer, Integer> classifier2 = i -> i % 23;
605
606        // Two-level groupBy
607        exerciseMapCollection(data,
608                              groupingBy(classifier, groupingBy(classifier2)),
609                              new GroupingByAssertion<>(classifier, HashMap.class,
610                                                        new GroupingByAssertion<>(classifier2, HashMap.class,
611                                                                                  new ToListAssertion<>())));
612        // with concurrent as upstream
613        exerciseMapCollection(data,
614                              groupingByConcurrent(classifier, groupingBy(classifier2)),
615                              new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
616                                                        new GroupingByAssertion<>(classifier2, HashMap.class,
617                                                                                  new ToListAssertion<>())));
618        // with concurrent as downstream
619        exerciseMapCollection(data,
620                              groupingBy(classifier, groupingByConcurrent(classifier2)),
621                              new GroupingByAssertion<>(classifier, HashMap.class,
622                                                        new GroupingByAssertion<>(classifier2, ConcurrentHashMap.class,
623                                                                                  new ToListAssertion<>())));
624        // with concurrent as upstream and downstream
625        exerciseMapCollection(data,
626                              groupingByConcurrent(classifier, groupingByConcurrent(classifier2)),
627                              new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
628                                                        new GroupingByAssertion<>(classifier2, ConcurrentHashMap.class,
629                                                                                  new ToListAssertion<>())));
630
631        // With explicit constructors
632        exerciseMapCollection(data,
633                              groupingBy(classifier, TreeMap::new, groupingBy(classifier2, TreeMap::new, toCollection(HashSet::new))),
634                              new GroupingByAssertion<>(classifier, TreeMap.class,
635                                                        new GroupingByAssertion<>(classifier2, TreeMap.class,
636                                                                                  new ToCollectionAssertion<Integer>(HashSet.class, false))));
637        // with concurrent as upstream
638        exerciseMapCollection(data,
639                              groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingBy(classifier2, TreeMap::new, toList())),
640                              new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
641                                                        new GroupingByAssertion<>(classifier2, TreeMap.class,
642                                                                                  new ToListAssertion<>())));
643        // with concurrent as downstream
644        exerciseMapCollection(data,
645                              groupingBy(classifier, TreeMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
646                              new GroupingByAssertion<>(classifier, TreeMap.class,
647                                                        new GroupingByAssertion<>(classifier2, ConcurrentSkipListMap.class,
648                                                                                  new ToListAssertion<>())));
649        // with concurrent as upstream and downstream
650        exerciseMapCollection(data,
651                              groupingByConcurrent(classifier, ConcurrentSkipListMap::new, groupingByConcurrent(classifier2, ConcurrentSkipListMap::new, toList())),
652                              new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
653                                                        new GroupingByAssertion<>(classifier2, ConcurrentSkipListMap.class,
654                                                                                  new ToListAssertion<>())));
655    }
656
657    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
658    public void testGroupubgByWithReducing(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
659        Function<Integer, Integer> classifier = i -> i % 3;
660
661        // Single-level simple reduce
662        exerciseMapCollection(data,
663                              groupingBy(classifier, reducing(0, Integer::sum)),
664                              new GroupingByAssertion<>(classifier, HashMap.class,
665                                                        new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
666        // with concurrent
667        exerciseMapCollection(data,
668                              groupingByConcurrent(classifier, reducing(0, Integer::sum)),
669                              new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
670                                                        new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
671
672        // With explicit constructors
673        exerciseMapCollection(data,
674                              groupingBy(classifier, TreeMap::new, reducing(0, Integer::sum)),
675                              new GroupingByAssertion<>(classifier, TreeMap.class,
676                                                        new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
677        // with concurrent
678        exerciseMapCollection(data,
679                              groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, Integer::sum)),
680                              new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
681                                                        new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
682
683        // Single-level map-reduce
684        exerciseMapCollection(data,
685                              groupingBy(classifier, reducing(0, mDoubler, Integer::sum)),
686                              new GroupingByAssertion<>(classifier, HashMap.class,
687                                                        new ReducingAssertion<>(0, mDoubler, Integer::sum)));
688        // with concurrent
689        exerciseMapCollection(data,
690                              groupingByConcurrent(classifier, reducing(0, mDoubler, Integer::sum)),
691                              new GroupingByAssertion<>(classifier, ConcurrentHashMap.class,
692                                                        new ReducingAssertion<>(0, mDoubler, Integer::sum)));
693
694        // With explicit constructors
695        exerciseMapCollection(data,
696                              groupingBy(classifier, TreeMap::new, reducing(0, mDoubler, Integer::sum)),
697                              new GroupingByAssertion<>(classifier, TreeMap.class,
698                                                        new ReducingAssertion<>(0, mDoubler, Integer::sum)));
699        // with concurrent
700        exerciseMapCollection(data,
701                              groupingByConcurrent(classifier, ConcurrentSkipListMap::new, reducing(0, mDoubler, Integer::sum)),
702                              new GroupingByAssertion<>(classifier, ConcurrentSkipListMap.class,
703                                                        new ReducingAssertion<>(0, mDoubler, Integer::sum)));
704    }
705
706    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
707    public void testSimplePartitioningBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
708        Predicate<Integer> classifier = i -> i % 3 == 0;
709
710        // Single-level partition to downstream List
711        exerciseMapCollection(data,
712                              partitioningBy(classifier),
713                              new PartitioningByAssertion<>(classifier, new ToListAssertion<>()));
714        exerciseMapCollection(data,
715                              partitioningBy(classifier, toList()),
716                              new PartitioningByAssertion<>(classifier, new ToListAssertion<>()));
717    }
718
719    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
720    public void testTwoLevelPartitioningBy(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
721        Predicate<Integer> classifier = i -> i % 3 == 0;
722        Predicate<Integer> classifier2 = i -> i % 7 == 0;
723
724        // Two level partition
725        exerciseMapCollection(data,
726                              partitioningBy(classifier, partitioningBy(classifier2)),
727                              new PartitioningByAssertion<>(classifier,
728                                                            new PartitioningByAssertion(classifier2, new ToListAssertion<>())));
729
730        // Two level partition with reduce
731        exerciseMapCollection(data,
732                              partitioningBy(classifier, reducing(0, Integer::sum)),
733                              new PartitioningByAssertion<>(classifier,
734                                                            new ReducingAssertion<>(0, LambdaTestHelpers.identity(), Integer::sum)));
735    }
736
737    @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
738    public void testComposeFinisher(String name, TestData.OfRef<Integer> data) throws ReflectiveOperationException {
739        List<Integer> asList = exerciseTerminalOps(data, s -> s.collect(toList()));
740        List<Integer> asImmutableList = exerciseTerminalOps(data, s -> s.collect(collectingAndThen(toList(), Collections::unmodifiableList)));
741        assertEquals(asList, asImmutableList);
742        try {
743            asImmutableList.add(0);
744            fail("Expecting immutable result");
745        }
746        catch (UnsupportedOperationException ignored) { }
747    }
748
749}
750