JAVA新特性(6)自定义收集器的实现与Collectors类



  1. 实现Collector接口,首先在类中实现接口中所有的抽象方法,然后在主程序中用collect传入实现的类的实例
    public class CollectorSetTest<T> implements Collector<T,Set<T>,Set<T>> {
        public static void main(String[] args) {
            List<String> list = Arrays.asList("Hello","World","NiHao");
            Set<String> strings = list.stream().collect(new CollectorSetTest<String>());
        public Supplier<Set<T>> supplier() {
            System.out.println("supplier invoked");
            return HashSet::new;
        public BiConsumer<Set<T>,T> accumulator() {
            System.out.println("accumulator invoked");
    //        return HashSet<T>::add;//不行
    //        return Set<T>::add;//可以
            return (set,item)->set.add(item);//与上等价
        public BinaryOperator<Set<T>> combiner() {
            System.out.println("combiner invoked");
            return (set1,set2)->{set1.addAll(set2); return set1;};
        public Function<Set<T>,Set<T>> finisher() {
            System.out.println("finisher invoked");
    //        return set->set;
            return Function.identity();
        public Set<Characteristics> characteristics() {
            System.out.println("characteristics invoked");
            return Collections.unmodifiableSet(EnumSet.of

  2. 并行流:可以有多个线程,每一个线程操作一个对应的结果容器;如果设置Collector的属性Characteristics.Concurrent,则多个线程会操作仅仅一个结果容器。
    public class CollectorSetTest2<T> implements Collector<T, Set<T>,Map<T,T>> {
    //    实现:输入:Set<String>   输出:Map<String,String>
        public static void main(String[] args) {
            List<String> list = Arrays.asList("Hello","World","NiHao","Beijing","Hello","a","b","c","d","e");
            Set<String> strings = new TreeSet<>();
    //      parallelStream一般默认情况下会生成的线程数:与CPU核心数相同的线程;Intel现在进行了超核技术。
            Map<String,String> map = strings.parallelStream().collect(new CollectorSetTest2<>());//1
    //        Map<String,String> map1 = strings.stream().parallel().
    //        collect(new CollectorSetTest2<>());//完全等价于1
        public Supplier<Set<T>> supplier() {
    //         * below must be equivalent:
    // * <pre>{@code
    // *     A a1 = supplier.get();
    // *     accumulator.accept(a1, t1);
    // *     accumulator.accept(a1, t2);
    // *     R r1 = finisher.apply(a1);  // result without splitting
    // *
    // *     A a2 = supplier.get();
    // *     accumulator.accept(a2, t1);
    // *     A a3 = supplier.get();
    // *     accumulator.accept(a3, t2);
    // *     R r2 = finisher.apply(combiner.apply(a2, a3));  // result with splitting
    // * } </pre>
            System.out.println("supplier invoked");
    //        return HashSet::new;
            return ()->{
                return new HashSet<T>();
        public BiConsumer<Set<T>, T> accumulator() {
            System.out.println("accumulator invoked");
            return (set,item)->{
                System.out.println(set + "," + Thread.currentThread().getName());//测试并行流时的输出
    //            当Characteristics.CONCURRENT:可能发生异常:ConcurrentModificationException
    //            it is not generally permissible for one thread to modify a Collection
    //            while another thread is iterating over it
        public BinaryOperator<Set<T>> combiner() {
    //        并行流且在Characteristics.CONCURRENT的情况下才会被调用
            System.out.println("combiner invoked");
            return (s,s1)->{s.addAll(s1);return s;};
        public Function<Set<T>,Map<T,T>> finisher() {
            System.out.println("finisher invoked");
            return set -> {
                Map<T,T> map = new HashMap<>();
                set.stream().forEach(item->map.put(item, item));
                return map;
        public Set<Characteristics> characteristics() {
            System.out.println("characteristics invoked");
    //        return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED,Characteristics.CONCURRENT));
            return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED));


  3. Collectors静态工厂类的实现共分为两种情况:
  4. 数字是一个值类型的,数组是引用类型的
         * Returns a {@code Collector} that produces the sum of a integer-valued
         * function applied to the input elements.  If no elements are present,
         * the result is 0.
         * @param <T> the type of the input elements
         * @param mapper a function extracting the property to be summed
         * @return a {@code Collector} that produces the sum of a derived property
        public static <T> Collector<T, ?, Integer>
        summingInt(ToIntFunction<? super T> mapper) {
            return new CollectorImpl<>(
                    () -> new int[1],//使用一个长度为1的数组作为操作结果容器
                    (a, t) -> { a[0] += mapper.applyAsInt(t); },
                    (a, b) -> { a[0] += b[0]; return a; },
                    a -> a[0], CH_NOID);


  5. 基础知识补充:
    原始类型包装类(primitive wrappers)(Integer,Long, Short, Double, Float, Character, Byte, Boolean)也都是不可变的
  6. Collectors中groupingBy 源码解析:
        public static <T, K> Collector<T, ?, Map<K, List<T>>>
        groupingBy(Function<? super T, ? extends K> classifier) {
            return groupingBy(classifier, toList());
        public static <T, K, A, D>
        Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
                                              Collector<? super T, A, D> downstream) {
            return groupingBy(classifier, HashMap::new, downstream);
        public static <T, K, D, A, M extends Map<K, D>>
        Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
                                      Supplier<M> mapFactory,
                                      Collector<? super T, A, D> downstream) {
            Supplier<A> downstreamSupplier = downstream.supplier();
            BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
            BiConsumer<Map<K, A>, T> accumulator = (m, t) -> {
                K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
                A container = m.computeIfAbsent(key, k -> downstreamSupplier.get());
                downstreamAccumulator.accept(container, t);
            };  //中间结果都是Map<K, A> 类型了
            BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner());//将中间结果进行合并(mapMerger)
            Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory;
    //强制类型转化为Supplier<Map<K, A>>类型,转化为了中间结果类型
            if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
                return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID);
            else {
                Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
                Function<Map<K, A>, M> finisher = intermediate -> {
                    intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v));
                    M castResult = (M) intermediate;
                    return castResult;
                return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID);


        //将输入参数类型为T,经过mapper function 输出参数为U.在多重收集reduction下是最有用的
        public static <T, U, A, R>
        Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper,
                                   Collector<? super U, A, R> downstream) {
            BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
            return new CollectorImpl<>(downstream.supplier(),
                                       (r, t) -> downstreamAccumulator.accept(r, mapper.apply(t)),
                                       downstream.combiner(), downstream.finisher(),
    For example, given a stream of {@code Person}, to accumulate the set of last names in each city:
       Map<City, Set<String>> lastNamesByCity
       = people.stream().collect(groupingBy(Person::getCity, mapping(Person::getLastName, toSet())));
    将每个city中的Person:使用mapping,mapping的作用:将此city中的Person按set of last name收集(收集为Set)
    mapping 中 Person::getLastName (mapper):person----><String>name, toset :<String>name---->Set<String>


