编程语言
首页 > 编程语言> > JAVA新特性(6)自定义收集器的实现与Collectors类

JAVA新特性(6)自定义收集器的实现与Collectors类

作者:互联网

  1. 实现Collector接口,首先在类中实现接口中所有的抽象方法,然后在主程序中用collect传入实现的类的实例
    public class CollectorSetTest<T> implements Collector<T,Set<T>,Set<T>> {
    //简单自定义收集器的实现:将list结果收集到一个Set中
    
        public static void main(String[] args) {
            List<String> list = Arrays.asList("Hello","World","NiHao");
            Set<String> strings = list.stream().collect(new CollectorSetTest<String>());
            System.out.println(strings);
        }
    
        @Override
        public Supplier<Set<T>> supplier() {
            System.out.println("supplier invoked");
            return HashSet::new;
        }
    
        @Override
        public BiConsumer<Set<T>,T> accumulator() {
            System.out.println("accumulator invoked");
            //报错,要使用给定泛型的Set,不能使用具体实现类
    //        return HashSet<T>::add;//不行
    //        return Set<T>::add;//可以
            return (set,item)->set.add(item);//与上等价
        }
    
        @Override
        public BinaryOperator<Set<T>> combiner() {
            System.out.println("combiner invoked");
            return (set1,set2)->{set1.addAll(set2); return set1;};
        }
    
        @Override
        public Function<Set<T>,Set<T>> finisher() {
            System.out.println("finisher invoked");
    //        return set->set;
            return Function.identity();
        }
    
        @Override
        public Set<Characteristics> characteristics() {
            System.out.println("characteristics invoked");
            return Collections.unmodifiableSet(EnumSet.of
                    (Characteristics.IDENTITY_FINISH,Characteristics.UNORDERED));
        }
    }



     
  2. 并行流:可以有多个线程,每一个线程操作一个对应的结果容器;如果设置Collector的属性Characteristics.Concurrent,则多个线程会操作仅仅一个结果容器。
    示例代码:
    public class CollectorSetTest2<T> implements Collector<T, Set<T>,Map<T,T>> {
    //    实现:输入:Set<String>   输出:Map<String,String>
    
        public static void main(String[] args) {
            List<String> list = Arrays.asList("Hello","World","NiHao","Beijing","Hello","a","b","c","d","e");
            Set<String> strings = new TreeSet<>();
            strings.addAll(list);
            System.out.println(strings);
    
            System.out.println(Runtime.getRuntime().availableProcessors());//打印线程数
    //      parallelStream一般默认情况下会生成的线程数:与CPU核心数相同的线程;Intel现在进行了超核技术。
            Map<String,String> map = strings.parallelStream().collect(new CollectorSetTest2<>());//1
    //        Map<String,String> map1 = strings.stream().parallel().
    //        collect(new CollectorSetTest2<>());//完全等价于1
            System.out.println(map);
        }
    
        @Override
        public Supplier<Set<T>> supplier() {
    //         * below must be equivalent:
    // * <pre>{@code
    // *     A a1 = supplier.get();
    // *     accumulator.accept(a1, t1);
    // *     accumulator.accept(a1, t2);
    // *     R r1 = finisher.apply(a1);  // result without splitting
    // *
    // *     A a2 = supplier.get();
    // *     accumulator.accept(a2, t1);
    // *     A a3 = supplier.get();
    // *     accumulator.accept(a3, t2);
    // *     R r2 = finisher.apply(combiner.apply(a2, a3));  // result with splitting
    // * } </pre>
    
            System.out.println("supplier invoked");
    //        return HashSet::new;
            return ()->{
                System.out.println("---------");
                //测试Characteristics.CONCURRENT时,中间结果容器的数量
                return new HashSet<T>();
            };
        }
    
        @Override
        public BiConsumer<Set<T>, T> accumulator() {
            System.out.println("accumulator invoked");
            return (set,item)->{
                System.out.println(set + "," + Thread.currentThread().getName());//测试并行流时的输出
                set.add(item);
    //            当Characteristics.CONCURRENT:可能发生异常:ConcurrentModificationException
    //            it is not generally permissible for one thread to modify a Collection
    //            while another thread is iterating over it
            };
        }
        @Override
        public BinaryOperator<Set<T>> combiner() {
    //        并行流且在Characteristics.CONCURRENT的情况下才会被调用
            System.out.println("combiner invoked");
            return (s,s1)->{s.addAll(s1);return s;};
        }
    
        @Override
        public Function<Set<T>,Map<T,T>> finisher() {
            System.out.println("finisher invoked");
            return set -> {
                Map<T,T> map = new HashMap<>();
                set.stream().forEach(item->map.put(item, item));
                return map;
            };
        }
    
        @Override
        public Set<Characteristics> characteristics() {
            System.out.println("characteristics invoked");
    //        return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED,Characteristics.CONCURRENT));
            return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED));
        }
    }
    

     

  3. Collectors静态工厂类的实现共分为两种情况:
    1)通过CollectorImpl来实现
    2)通过reducing方法来实现;reducing方法本身又是通过CollectorImpl来实现的
  4. 数字是一个值类型的,数组是引用类型的
    因为如此,源码中sumingInt的实现:
        /**
         * Returns a {@code Collector} that produces the sum of a integer-valued
         * function applied to the input elements.  If no elements are present,
         * the result is 0.
         *
         * @param <T> the type of the input elements
         * @param mapper a function extracting the property to be summed
         * @return a {@code Collector} that produces the sum of a derived property
         */
        public static <T> Collector<T, ?, Integer>
        summingInt(ToIntFunction<? super T> mapper) {
            return new CollectorImpl<>(
                    () -> new int[1],//使用一个长度为1的数组作为操作结果容器
                    (a, t) -> { a[0] += mapper.applyAsInt(t); },
                    (a, b) -> { a[0] += b[0]; return a; },
                    a -> a[0], CH_NOID);
        }

     

  5. 基础知识补充:
    原始类型包装类(primitive wrappers)(Integer,Long, Short, Double, Float, Character, Byte, Boolean)也都是不可变的
    不可变集合:
    1)保证线程安全
    2)被不可信的类库使用时会很安全
    3)如果一个对象不需要支持修改操作,将会节省空间和时间开销;能跟有效的利用内存。
    4)注:从可变集合转化为不可变集合,需要将所有元素赋给新的一个不可变集合,并禁止所有对原集合的更改。
  6. Collectors中groupingBy 源码解析:
        //classifier分类器:输入一个参数类型T及以上,返回一个结果参数类型K;如输入年龄,返回学生
        //Collector:参数一为输入参数年龄,中间类型?自己来定义,返回一个映射Map类型,对应key为年龄,value为学生列表
        public static <T, K> Collector<T, ?, Map<K, List<T>>>
        groupingBy(Function<? super T, ? extends K> classifier) {
            return groupingBy(classifier, toList());
        }
    
        //构造:接收了一个收集器(Collector:downstream)以及classifier,返回一个收集器;
        //构造思路:将classifier应用到传入的收集器(downstream)上
        public static <T, K, A, D>
        Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
                                              Collector<? super T, A, D> downstream) {
            return groupingBy(classifier, HashMap::new, downstream);
        }
    
        //接收三个参数:Function,Supplier,Collector(downstream);
        //调用downstream的那几个抽象方法来完成收集:supplier,accumulator,combiner,finisher
        //通过Function和Supplier来设置一些分类条件使用downstream来收集,得到新的CollectorImpl对象
        public static <T, K, D, A, M extends Map<K, D>>
        Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
                                      Supplier<M> mapFactory,
                                      Collector<? super T, A, D> downstream) {
            Supplier<A> downstreamSupplier = downstream.supplier();
            BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
            BiConsumer<Map<K, A>, T> accumulator = (m, t) -> {
                K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
    //K分类器classfier的返回类型
                A container = m.computeIfAbsent(key, k -> downstreamSupplier.get());
    //A中间可变容器类型,key对应的value中的如果没有内容就将downstreamSupplier容器对象放到value位置
                downstreamAccumulator.accept(container, t);
    //前两步是为了这一步做准备:downstreamAccumulator中的container(A类型)是有前两步计算出来的
            };  //中间结果都是Map<K, A> 类型了
            BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner());//将中间结果进行合并(mapMerger)
            @SuppressWarnings("unchecked")
            Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory;
    //强制类型转化为Supplier<Map<K, A>>类型,转化为了中间结果类型
            if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
                return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID);
    //返回得到的新的CollectorImpl对象(收集器)
            }
            else {
                @SuppressWarnings("unchecked")
                Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
                Function<Map<K, A>, M> finisher = intermediate -> {
                    intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v));
                    @SuppressWarnings("unchecked")
                    M castResult = (M) intermediate;
                    return castResult;
                };
                return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID);
            }
        }

    其中mapping源码解析:
     

        //将输入参数类型为T,经过mapper function 输出参数为U.在多重收集reduction下是最有用的
        //U为下游收集器downstream的输入,R为downstream的输出
        public static <T, U, A, R>
        Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper,
                                   Collector<? super U, A, R> downstream) {
            BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
            return new CollectorImpl<>(downstream.supplier(),
                                       (r, t) -> downstreamAccumulator.accept(r, mapper.apply(t)),
                                       downstream.combiner(), downstream.finisher(),
                                       downstream.characteristics());
        }
    For example, given a stream of {@code Person}, to accumulate the set of last names in each city:
       Map<City, Set<String>> lastNamesByCity
       = people.stream().collect(groupingBy(Person::getCity, mapping(Person::getLastName, toSet())));
    将每个city中的Person:使用mapping,mapping的作用:将此city中的Person按set of last name收集(收集为Set)
    mapping 中 Person::getLastName (mapper):person----><String>name, toset :<String>name---->Set<String>
    所以:mapping返回set<String>;  
    
  7.  





  8.  

标签:return,自定义,downstream,收集器,System,println,JAVA,Collector,public
来源: https://blog.csdn.net/liuzewei2015/article/details/90378176