JAVA新特性(6)自定义收集器的实现与Collectors类
作者:互联网
- 实现Collector接口,首先在类中实现接口中所有的抽象方法,然后在主程序中用collect传入实现的类的实例
public class CollectorSetTest<T> implements Collector<T,Set<T>,Set<T>> { //简单自定义收集器的实现:将list结果收集到一个Set中 public static void main(String[] args) { List<String> list = Arrays.asList("Hello","World","NiHao"); Set<String> strings = list.stream().collect(new CollectorSetTest<String>()); System.out.println(strings); } @Override public Supplier<Set<T>> supplier() { System.out.println("supplier invoked"); return HashSet::new; } @Override public BiConsumer<Set<T>,T> accumulator() { System.out.println("accumulator invoked"); //报错,要使用给定泛型的Set,不能使用具体实现类 // return HashSet<T>::add;//不行 // return Set<T>::add;//可以 return (set,item)->set.add(item);//与上等价 } @Override public BinaryOperator<Set<T>> combiner() { System.out.println("combiner invoked"); return (set1,set2)->{set1.addAll(set2); return set1;}; } @Override public Function<Set<T>,Set<T>> finisher() { System.out.println("finisher invoked"); // return set->set; return Function.identity(); } @Override public Set<Characteristics> characteristics() { System.out.println("characteristics invoked"); return Collections.unmodifiableSet(EnumSet.of (Characteristics.IDENTITY_FINISH,Characteristics.UNORDERED)); } }
- 并行流:可以有多个线程,每一个线程操作一个对应的结果容器;如果设置Collector的属性Characteristics.Concurrent,则多个线程会操作仅仅一个结果容器。
示例代码:public class CollectorSetTest2<T> implements Collector<T, Set<T>,Map<T,T>> { // 实现:输入:Set<String> 输出:Map<String,String> public static void main(String[] args) { List<String> list = Arrays.asList("Hello","World","NiHao","Beijing","Hello","a","b","c","d","e"); Set<String> strings = new TreeSet<>(); strings.addAll(list); System.out.println(strings); System.out.println(Runtime.getRuntime().availableProcessors());//打印线程数 // parallelStream一般默认情况下会生成的线程数:与CPU核心数相同的线程;Intel现在进行了超核技术。 Map<String,String> map = strings.parallelStream().collect(new CollectorSetTest2<>());//1 // Map<String,String> map1 = strings.stream().parallel(). // collect(new CollectorSetTest2<>());//完全等价于1 System.out.println(map); } @Override public Supplier<Set<T>> supplier() { // * below must be equivalent: // * <pre>{@code // * A a1 = supplier.get(); // * accumulator.accept(a1, t1); // * accumulator.accept(a1, t2); // * R r1 = finisher.apply(a1); // result without splitting // * // * A a2 = supplier.get(); // * accumulator.accept(a2, t1); // * A a3 = supplier.get(); // * accumulator.accept(a3, t2); // * R r2 = finisher.apply(combiner.apply(a2, a3)); // result with splitting // * } </pre> System.out.println("supplier invoked"); // return HashSet::new; return ()->{ System.out.println("---------"); //测试Characteristics.CONCURRENT时,中间结果容器的数量 return new HashSet<T>(); }; } @Override public BiConsumer<Set<T>, T> accumulator() { System.out.println("accumulator invoked"); return (set,item)->{ System.out.println(set + "," + Thread.currentThread().getName());//测试并行流时的输出 set.add(item); // 当Characteristics.CONCURRENT:可能发生异常:ConcurrentModificationException // it is not generally permissible for one thread to modify a Collection // while another thread is iterating over it }; } @Override public BinaryOperator<Set<T>> combiner() { // 并行流且在Characteristics.CONCURRENT的情况下才会被调用 System.out.println("combiner invoked"); return (s,s1)->{s.addAll(s1);return s;}; } @Override public Function<Set<T>,Map<T,T>> finisher() { System.out.println("finisher invoked"); return set -> { Map<T,T> map = new HashMap<>(); set.stream().forEach(item->map.put(item, item)); return map; }; } @Override public Set<Characteristics> characteristics() { System.out.println("characteristics invoked"); // return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED,Characteristics.CONCURRENT)); return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED)); } }
- Collectors静态工厂类的实现共分为两种情况:
1)通过CollectorImpl来实现
2)通过reducing方法来实现;reducing方法本身又是通过CollectorImpl来实现的 - 数字是一个值类型的,数组是引用类型的
因为如此,源码中sumingInt的实现:/** * Returns a {@code Collector} that produces the sum of a integer-valued * function applied to the input elements. If no elements are present, * the result is 0. * * @param <T> the type of the input elements * @param mapper a function extracting the property to be summed * @return a {@code Collector} that produces the sum of a derived property */ public static <T> Collector<T, ?, Integer> summingInt(ToIntFunction<? super T> mapper) { return new CollectorImpl<>( () -> new int[1],//使用一个长度为1的数组作为操作结果容器 (a, t) -> { a[0] += mapper.applyAsInt(t); }, (a, b) -> { a[0] += b[0]; return a; }, a -> a[0], CH_NOID); }
- 基础知识补充:
原始类型包装类(primitive wrappers)(Integer,Long, Short, Double, Float, Character, Byte, Boolean)也都是不可变的
不可变集合:
1)保证线程安全
2)被不可信的类库使用时会很安全
3)如果一个对象不需要支持修改操作,将会节省空间和时间开销;能跟有效的利用内存。
4)注:从可变集合转化为不可变集合,需要将所有元素赋给新的一个不可变集合,并禁止所有对原集合的更改。 - Collectors中groupingBy 源码解析:
//classifier分类器:输入一个参数类型T及以上,返回一个结果参数类型K;如输入年龄,返回学生 //Collector:参数一为输入参数年龄,中间类型?自己来定义,返回一个映射Map类型,对应key为年龄,value为学生列表 public static <T, K> Collector<T, ?, Map<K, List<T>>> groupingBy(Function<? super T, ? extends K> classifier) { return groupingBy(classifier, toList()); } //构造:接收了一个收集器(Collector:downstream)以及classifier,返回一个收集器; //构造思路:将classifier应用到传入的收集器(downstream)上 public static <T, K, A, D> Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream) { return groupingBy(classifier, HashMap::new, downstream); } //接收三个参数:Function,Supplier,Collector(downstream); //调用downstream的那几个抽象方法来完成收集:supplier,accumulator,combiner,finisher //通过Function和Supplier来设置一些分类条件使用downstream来收集,得到新的CollectorImpl对象 public static <T, K, D, A, M extends Map<K, D>> Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier, Supplier<M> mapFactory, Collector<? super T, A, D> downstream) { Supplier<A> downstreamSupplier = downstream.supplier(); BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator(); BiConsumer<Map<K, A>, T> accumulator = (m, t) -> { K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key"); //K分类器classfier的返回类型 A container = m.computeIfAbsent(key, k -> downstreamSupplier.get()); //A中间可变容器类型,key对应的value中的如果没有内容就将downstreamSupplier容器对象放到value位置 downstreamAccumulator.accept(container, t); //前两步是为了这一步做准备:downstreamAccumulator中的container(A类型)是有前两步计算出来的 }; //中间结果都是Map<K, A> 类型了 BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner());//将中间结果进行合并(mapMerger) @SuppressWarnings("unchecked") Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory; //强制类型转化为Supplier<Map<K, A>>类型,转化为了中间结果类型 if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) { return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID); //返回得到的新的CollectorImpl对象(收集器) } else { @SuppressWarnings("unchecked") Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher(); Function<Map<K, A>, M> finisher = intermediate -> { intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v)); @SuppressWarnings("unchecked") M castResult = (M) intermediate; return castResult; }; return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID); } }
其中mapping源码解析:
//将输入参数类型为T,经过mapper function 输出参数为U.在多重收集reduction下是最有用的 //U为下游收集器downstream的输入,R为downstream的输出 public static <T, U, A, R> Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper, Collector<? super U, A, R> downstream) { BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator(); return new CollectorImpl<>(downstream.supplier(), (r, t) -> downstreamAccumulator.accept(r, mapper.apply(t)), downstream.combiner(), downstream.finisher(), downstream.characteristics()); }
For example, given a stream of {@code Person}, to accumulate the set of last names in each city: Map<City, Set<String>> lastNamesByCity = people.stream().collect(groupingBy(Person::getCity, mapping(Person::getLastName, toSet()))); 将每个city中的Person:使用mapping,mapping的作用:将此city中的Person按set of last name收集(收集为Set) mapping 中 Person::getLastName (mapper):person----><String>name, toset :<String>name---->Set<String> 所以:mapping返回set<String>;
标签:return,自定义,downstream,收集器,System,println,JAVA,Collector,public 来源: https://blog.csdn.net/liuzewei2015/article/details/90378176