编程语言
首页 > 编程语言> > java自定义stream

java自定义stream

作者:互联网

一、流程

1

        // 自定义集合,继承ArrayList,与ArrayList没啥区别
        SelfList<Apple> appleList = new SelfList<>();
import java.util.ArrayList;
import java.util.Iterator;

/**
 * 自定义集合,继承ArrayList,与ArrayList没啥区别
 */
public class SelfList<T> extends ArrayList<T> {
    /**
     * 数据源放进头部管道,头部管段很窄,深度为0,没有sink操作
     *
     * @return 头部管道(作用是持有数据源)
     */
    public SelfStream<T> selfStream() {
        Iterator<T> listIterator = super.iterator();
        return new SelfPipeline.SelfHead<>(listIterator);
    }
}
    /**
     * 头部管道:继承管道,属于管道的一种
     */
    static class SelfHead<P_IN, P_OUT> extends SelfPipeline<P_IN, P_OUT> {
        SelfHead(Iterator<?> source) {
            // 初始化头部管道
            super(source);
    /**
     * 初始化头部管道
     *
     * @param source 数据源
     */
    SelfPipeline(Iterator<?> source) {
        // 数据源
        this.sourceIterator = source;
        // 数据源Op,即是头部管道自身,后面每一段管道通过持有“头部管道”获取数据源
        this.sourceStage = this;

        // 上段管道,头部管道没有上段管道
        this.previousStage = null;

        // 头部管道的深度=0,下面的每段管道的深度依次+1
        this.depth = 0;
    }
        }

        // 头部管道无需sink
        @Override
        final SelfSink<P_IN> opWrapSink(SelfSink<P_OUT> sink) {
            throw new UnsupportedOperationException();
        }
    }
        appleList.add(new Apple(1, "青色"));
        appleList.add(new Apple(2, "橙色"));
        appleList.add(new Apple(3, "红色"));
        appleList.add(new Apple(4, "绿色"));
        appleList.add(new Apple(5, "绿色"));
        appleList.add(new Apple(6, "紫色"));

2

        // 把数据源(appleList)放进头部管道(pipelineHead),接下来每段管道都持有头部管道以获取数据源
        SelfStream<Apple> pipelineHead = appleList.selfStream();
见SelfList

3

        // 线性拼接下一段管道(FilterOp),并定义这段管道的职责(计划由FilterSink执行)
        SelfStream<Apple> statelessOpFilter = pipelineHead.filter(item -> "绿色".equals(item.getColor()));
    /**
     * 生成Op管道(FilterOp),定义这段管道的职责(并由FilterSink执行该职责)
     *
     * @param predicate 断言型函数式接口
     * @return Op管道(FilterOp)
     */
    @Override
    public final SelfStream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        return new SelfStatelessOp<P_OUT, P_OUT>(this) {
    /**
     * Op管道:继承管道,属于管道的一种
     */
    abstract static class SelfStatelessOp<P_IN, P_OUT> extends SelfPipeline<P_IN, P_OUT> {
        /**
         * @param upstream 上段管道
         */
        SelfStatelessOp(SelfPipeline<?, P_IN> upstream) {
            // 初始化Op管道
            super(upstream);
    /**
     * 初始化Op管道
     *
     * @param upstream 上段管道
     */
    SelfPipeline(SelfPipeline<?, P_IN> upstream) {
        // 自己的上段管道
        this.previousStage = upstream;
        // 上段管道的下段是自己
        upstream.nextStage = this;

        // 每一段管道都持有“头部管道”
        this.sourceStage = upstream.sourceStage;

        // 深度+1
        this.depth = upstream.depth + 1;
    }
        }
    }
       // 定义这段管道的职责(并由FilterSink执行该职责)
            @Override
            SelfSink<P_OUT> opWrapSink(SelfSink<P_OUT> sink) {
                // FilterSink
                return new SelfSink.Chain<P_OUT, P_OUT>(sink) {
    /**
     * sink链
     */
    abstract class Chain<T, E_OUT> implements SelfSink<T> {
        // 下个sink
        protected final SelfSink<? super E_OUT> downstream;

        public Chain(SelfSink<? super E_OUT> downstream) {
            // 向下的单向链条
            this.downstream = downstream;
        }

        // 本例:单向链条传递到最后一个ReducingSink,初始化结果集合
        @Override
        public void begin(long size) {
            downstream.begin(size);
        }
    }

                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u)) {
                            // 触发下一段管道的动作,下沉
                            downstream.accept(u);
                        }
                    }
                };
            }
        };
    }

4

        // 线性拼接下一段管道(MapOp),并定义这段管道的职责(计划由MapSink执行)
        SelfStream<Integer> statelessOpMap = statelessOpFilter.map(Apple::getWeight);
    /**
     * 生成Op管道(MapOp),定义这段管道的职责(并由MapSink执行该职责)
     *
     * @param mapper 函数型函数式接口:有入参和返回值
     * @return Op管道(MapOp)
     */
    @Override
    public final <R> SelfStream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        return new SelfStatelessOp<P_OUT, R>(this) {
如上FilterOp
       // 定义这段管道的职责(并由MapSink执行该职责)
            @Override
            SelfSink<P_OUT> opWrapSink(SelfSink<R> sink) {
                // MapSink
                return new SelfSink.Chain<P_OUT, R>(sink) {
如上FilterOp
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }

5

        /*
         * 执行终结操作:
         * 1,生成ReducingSink(设计为收集汇聚上游的流);
         * 2,生成MapSink,让MapSink向下链接ReducingSink;
         * 3,通过MapOp来到FilterOp,生成FilterSink,让FilterSink向下链接MapSink;
         * 4,遍历数据源的每一个元素,让元素依次流经FilterSink->MapSink->ReducingSink,汇聚成最终形态
         */
        SelfList<Apple> terminalOpCollect = statelessOpMap.collect(SelfCollectors.toList());
    /**
     * 终结操作
     *
     * @param collector 此例子是Collectors.toList(),定义了结果集和操作结果集的方法
     * @return 最终结果:即汇聚最终结果都是在此方法中进行的
     */
    @Override
    public final <R, A> R collect(SelfCollector<? super P_OUT, A> collector) {
        // 在终结操作里尾部管道(TerminalOp)
        SelfTerminalOp<P_OUT, A> terminalOp = SelfReduceOpsFactory.makeRef(collector);
    // 生成尾部管道
    public static <T, I> SelfTerminalOp<T, I> makeRef(SelfCollector<? super T, I> collector) {
        // 此例子是(Supplier<List<T>>) ArrayList::new
        Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
        // 此例子是List::add
        BiConsumer<I, ? super T> accumulator = collector.accumulator();

        // 定义了ReducingSink的职责
        class ReducingSink extends SelfBox<I> implements SelfTerminalSink<T, I> {
            @Override
            public void begin(long size) {
                // state = ArrayList::new = 初始化的空集合
                state = supplier.get();
            }

            @Override
            public void accept(T t) {
                // 向集合state中添加元素t
                accumulator.accept(state, t);
            }
        }

        // 生成ReduceOp(尾部管道接口的实现类),定义尾部管道的职责(并由ReducingSink执行该职责)
        return new SelfReduceOp<T, I, ReducingSink>() {
            // ReducingSink
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }
        };
    }

     // 拉网
        A container = evaluate(terminalOp);
    /**
     * 使用terminalOp拉网
     *
     * @param terminalOp 使用statelessOp.collect方法的参数构造出来的
     * @param <R>
     * @return 拉网结果,ReducingSink的get方法得到最终集合
     */
    final <R> R evaluate(SelfTerminalOp<P_OUT, R> terminalOp) {
        // 当前对象为map的statelessOp
        Iterator<?> iterator = sourceStage.sourceIterator;
        // 按顺序拉网:terminalOp是使用statelessOp.collect方法的参数构造出来的
        return terminalOp.evaluateSequential(this, iterator);
        /**
         * 顺序评估管道
         */
        @Override
        public <P_IN> R evaluateSequential(SelfPipelineHelper<T> helper, Iterator<P_IN> iterator) {
            // 生成尾部管道的sink
            S reducingSink = makeSink();
如上makeRef
       // 管道包装sink:从ReducingSink(4)开始,按照3,4;2,3;1,2的顺序构建sink链,按照1,2,3,4的顺序执行流转,由4收集最终结果
            S wrappedReducingSink = helper.wrapAndCopyInto(reducingSink, iterator);
    /**
     * 管道包装sink
     *
     * @return 评估管道结束后,返回终结操作管道的sink
     */
    @Override
    final <P_IN, S extends SelfSink<P_OUT>> S wrapAndCopyInto(S sink, Iterator<P_IN> iterator) {
        // 向前遍历管道,根据管道之前的职责定义,包装生成对应的sink,并使用链条串联起来;返回sink链的第一个对象
        SelfSink<P_IN> firstSinkLink = wrapSink(Objects.requireNonNull(sink));
    /**
     * 管道包装sink
     *
     * @param sink 此例子是:传入的是ReducingSink
     * @return sink链的第一个对象
     */
    @Override
    final <P_IN> SelfSink<P_IN> wrapSink(SelfSink<P_OUT> sink) {
        // 向前遍历管道,根据管道之前的职责定义,包装生成对应的sink,并使用链条串联起来
        for (SelfPipeline p = SelfPipeline.this; p.depth > 0; p = p.previousStage) {
            sink = p.opWrapSink(sink);
见FilterOp、MapOp的职责定义部分
        }
        return (SelfSink<P_IN>) sink;
    }

     // 数据源的每一个元素都从sink链的第一个对象流转到最后一个对象,并被最后一个对象收集
        copyInto(firstSinkLink, iterator);
    /**
     * 遍历数据源元素,顺序执行sink的accept方法(consumer)
     *
     * @param wrappedSink sink链的第一个对象
     * @param iterator    数据源
     */
    @Override
    final <P_IN> void copyInto(SelfSink<P_IN> wrappedSink, Iterator<P_IN> iterator) {
        // 准备好尾部管道的收集装置
        wrappedSink.begin(-1);

        // 遍历流转和收集
        iterator.forEachRemaining(wrappedSink);
// 收尾工作(无)
        wrappedSink.end();
    }

     // 返回ReducingSink
        return sink;
    }

       // 返回ReducingSink持有和可以supplier的集合
            return wrappedReducingSink.get();
        }
    }
return (R) container;
    }

6

        System.out.println(terminalOpCollect);

7

        // 简写
        SelfList<Apple> apples = appleList.selfStream()
                .filter(item -> "绿色".equals(item.getColor()))
                .map(Apple::getWeight)
                .collect(SelfCollectors.toList());
        System.out.println(apples);

二、完整代码

package com.simple.boot.java_skill.selfλ;

import com.simple.boot.java_skill.functionprograming.Apple;

/**
 * main函数
 */
public class SelfLambdaTest {
    public static void main(String[] args) {
        // 自定义集合,继承ArrayList,与ArrayList没啥区别
        SelfList<Apple> appleList = new SelfList<>();

        appleList.add(new Apple(1, "青色"));
        appleList.add(new Apple(2, "橙色"));
        appleList.add(new Apple(3, "红色"));
        appleList.add(new Apple(4, "绿色"));
        appleList.add(new Apple(5, "绿色"));
        appleList.add(new Apple(6, "紫色"));

        // 把数据源(appleList)放进头部管道(pipelineHead),接下来每段管道都持有头部管道以获取数据源
        SelfStream<Apple> pipelineHead = appleList.selfStream();
        // 线性拼接下一段管道(FilterOp),并定义这段管道的职责(计划由FilterSink执行)
        SelfStream<Apple> statelessOpFilter = pipelineHead.filter(item -> "绿色".equals(item.getColor()));
        // 线性拼接下一段管道(MapOp),并定义这段管道的职责(计划由MapSink执行)
        SelfStream<Integer> statelessOpMap = statelessOpFilter.map(Apple::getWeight);
        /*
         * 执行终结操作:
         * 1,生成ReducingSink(设计为收集汇聚上游的流);
         * 2,生成MapSink,让MapSink向下链接ReducingSink;
         * 3,通过MapOp来到FilterOp,生成FilterSink,让FilterSink向下链接MapSink;
         * 4,遍历数据源的每一个元素,让元素依次流经FilterSink->MapSink->ReducingSink,汇聚成最终形态
         */
        SelfList<Apple> terminalOpCollect = statelessOpMap.collect(SelfCollectors.toList());
        System.out.println(terminalOpCollect);

        // 简写
        SelfList<Apple> apples = appleList.selfStream()
                .filter(item -> "绿色".equals(item.getColor()))
                .map(Apple::getWeight)
                .collect(SelfCollectors.toList());
        System.out.println(apples);
    }
}
package com.simple.boot.java_skill.selfλ;

import java.util.ArrayList;
import java.util.Iterator;

/**
 * 自定义集合,继承ArrayList,与ArrayList没啥区别
 */
public class SelfList<T> extends ArrayList<T> {
    /**
     * 数据源放进头部管道,头部管段很窄,深度为0,没有sink操作
     *
     * @return 头部管道(作用是持有数据源)
     */
    public SelfStream<T> selfStream() {
        Iterator<T> listIterator = super.iterator();
        return new SelfPipeline.SelfHead<>(listIterator);
    }
}
package com.simple.boot.java_skill.selfλ;

import java.util.Iterator;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Predicate;

/**
 * 管道:流的一种具化实现
 */
public abstract class SelfPipeline<P_IN, P_OUT>
        extends SelfPipelineHelper<P_OUT> implements SelfStream<P_OUT> {
    // 数据源
    private Iterator<?> sourceIterator;
    // 头部管道(持有数据源)
    private final SelfPipeline sourceStage;

    // 上段管道
    private final SelfPipeline previousStage;

    // 下段管道
    private SelfPipeline nextStage;

    // 本段管道深度
    private int depth;

    /**
     * 初始化Op管道
     *
     * @param upstream 上段管道
     */
    SelfPipeline(SelfPipeline<?, P_IN> upstream) {
        // 自己的上段管道
        this.previousStage = upstream;
        // 上段管道的下段是自己
        upstream.nextStage = this;

        // 每一段管道都持有“头部管道”
        this.sourceStage = upstream.sourceStage;

        // 深度+1
        this.depth = upstream.depth + 1;
    }

    /**
     * 初始化头部管道
     *
     * @param source 数据源
     */
    SelfPipeline(Iterator<?> source) {
        // 数据源
        this.sourceIterator = source;
        // 数据源Op,即是头部管道自身,后面每一段管道通过持有“头部管道”获取数据源
        this.sourceStage = this;

        // 上段管道,头部管道没有上段管道
        this.previousStage = null;

        // 头部管道的深度=0,下面的每段管道的深度依次+1
        this.depth = 0;
    }

    /**
     * 头部管道:继承管道,属于管道的一种
     */
    static class SelfHead<P_IN, P_OUT> extends SelfPipeline<P_IN, P_OUT> {
        SelfHead(Iterator<?> source) {
            // 初始化头部管道
            super(source);
        }

        // 头部管道无需sink
        @Override
        final SelfSink<P_IN> opWrapSink(SelfSink<P_OUT> sink) {
            throw new UnsupportedOperationException();
        }
    }

    /**
     * Op管道:继承管道,属于管道的一种
     */
    abstract static class SelfStatelessOp<P_IN, P_OUT> extends SelfPipeline<P_IN, P_OUT> {
        /**
         * @param upstream 上段管道
         */
        SelfStatelessOp(SelfPipeline<?, P_IN> upstream) {
            // 初始化Op管道
            super(upstream);
        }
    }

    abstract SelfSink<P_IN> opWrapSink(SelfSink<P_OUT> sink);

    /**
     * 生成Op管道(FilterOp),定义这段管道的职责(并由FilterSink执行该职责)
     *
     * @param predicate 断言型函数式接口
     * @return Op管道(FilterOp)
     */
    @Override
    public final SelfStream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        return new SelfStatelessOp<P_OUT, P_OUT>(this) {
            // 定义这段管道的职责(并由FilterSink执行该职责)
            @Override
            SelfSink<P_OUT> opWrapSink(SelfSink<P_OUT> sink) {
                // FilterSink
                return new SelfSink.Chain<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u)) {
                            // 触发下一段管道的动作,下沉
                            downstream.accept(u);
                        }
                    }
                };
            }
        };
    }

    /**
     * 生成Op管道(MapOp),定义这段管道的职责(并由MapSink执行该职责)
     *
     * @param mapper 函数型函数式接口:有入参和返回值
     * @return Op管道(MapOp)
     */
    @Override
    public final <R> SelfStream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        return new SelfStatelessOp<P_OUT, R>(this) {
            // 定义这段管道的职责(并由MapSink执行该职责)
            @Override
            SelfSink<P_OUT> opWrapSink(SelfSink<R> sink) {
                // MapSink
                return new SelfSink.Chain<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }

    /**
     * 终结操作
     *
     * @param collector 此例子是Collectors.toList(),定义了结果集和操作结果集的方法
     * @return 最终结果:即汇聚最终结果都是在此方法中进行的
     */
    @Override
    public final <R, A> R collect(SelfCollector<? super P_OUT, A> collector) {
        // 在终结操作里尾部管道(TerminalOp)
        SelfTerminalOp<P_OUT, A> terminalOp = SelfReduceOpsFactory.makeRef(collector);

        // 拉网
        A container = evaluate(terminalOp);

        return (R) container;
    }


    /**
     * 使用terminalOp拉网
     *
     * @param terminalOp 使用statelessOp.collect方法的参数构造出来的
     * @param <R>
     * @return 拉网结果,ReducingSink的get方法得到最终集合
     */
    final <R> R evaluate(SelfTerminalOp<P_OUT, R> terminalOp) {
        // 当前对象为map的statelessOp
        Iterator<?> iterator = sourceStage.sourceIterator;
        // 按顺序拉网:terminalOp是使用statelessOp.collect方法的参数构造出来的
        return terminalOp.evaluateSequential(this, iterator);
    }

    /**
     * 管道包装sink
     *
     * @return 评估管道结束后,返回终结操作管道的sink
     */
    @Override
    final <P_IN, S extends SelfSink<P_OUT>> S wrapAndCopyInto(S sink, Iterator<P_IN> iterator) {
        // 向前遍历管道,根据管道之前的职责定义,包装生成对应的sink,并使用链条串联起来;返回sink链的第一个对象
        SelfSink<P_IN> firstSinkLink = wrapSink(Objects.requireNonNull(sink));

        // 数据源的每一个元素都从sink链的第一个对象流转到最后一个对象,并被最后一个对象收集
        copyInto(firstSinkLink, iterator);

        // 返回ReducingSink
        return sink;
    }

    /**
     * 遍历数据源元素,顺序执行sink的accept方法(consumer)
     *
     * @param wrappedSink sink链的第一个对象
     * @param iterator    数据源
     */
    @Override
    final <P_IN> void copyInto(SelfSink<P_IN> wrappedSink, Iterator<P_IN> iterator) {
        // 准备好尾部管道的收集装置
        wrappedSink.begin(-1);

        // 遍历流转和收集
        iterator.forEachRemaining(wrappedSink);

        // 收尾工作(无)
        wrappedSink.end();
    }

    /**
     * 管道包装sink
     *
     * @param sink 此例子是:传入的是ReducingSink
     * @return sink链的第一个对象
     */
    @Override
    final <P_IN> SelfSink<P_IN> wrapSink(SelfSink<P_OUT> sink) {
        // 向前遍历管道,根据管道之前的职责定义,包装生成对应的sink,并使用链条串联起来
        for (SelfPipeline p = SelfPipeline.this; p.depth > 0; p = p.previousStage) {
            sink = p.opWrapSink(sink);
        }
        return (SelfSink<P_IN>) sink;
    }

}
package com.simple.boot.java_skill.selfλ;

import java.util.function.BiConsumer;
import java.util.function.Supplier;

public interface SelfCollector<T, A> {

    Supplier<A> supplier();

    BiConsumer<A, T> accumulator();
}
package com.simple.boot.java_skill.selfλ;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

public final class SelfCollectors {

    public static <T> SelfCollector<T, ?> toList() {
        return new SelfCollectorImpl<>((Supplier<List<T>>) SelfList::new, List::add);
    }

    static class SelfCollectorImpl<T, A> implements SelfCollector<T, A> {
        private final Supplier<A> supplier;
        private final BiConsumer<A, T> accumulator;

        SelfCollectorImpl(Supplier<A> supplier, BiConsumer<A, T> accumulator) {
            this.supplier = supplier;
            this.accumulator = accumulator;
        }

        @Override
        public BiConsumer<A, T> accumulator() {
            return accumulator;
        }

        @Override
        public Supplier<A> supplier() {
            return supplier;
        }
    }
}
package com.simple.boot.java_skill.selfλ;

import java.util.Iterator;

/**
 * 管道工具类
 */
public abstract class SelfPipelineHelper<P_OUT> {

    abstract <P_IN, S extends SelfSink<P_OUT>> S wrapAndCopyInto(S sink, Iterator<P_IN> iterator);

    abstract <P_IN> void copyInto(SelfSink<P_IN> wrappedSink, Iterator<P_IN> iterator);

    abstract <P_IN> SelfSink<P_IN> wrapSink(SelfSink<P_OUT> sink);
}
package com.simple.boot.java_skill.selfλ;

import java.util.Iterator;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

/**
 * 尾部管道工厂模式
 */
public class SelfReduceOpsFactory {
    // 生成尾部管道
    public static <T, I> SelfTerminalOp<T, I> makeRef(SelfCollector<? super T, I> collector) {
        // 此例子是(Supplier<List<T>>) ArrayList::new
        Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
        // 此例子是List::add
        BiConsumer<I, ? super T> accumulator = collector.accumulator();

        // 定义了ReducingSink的职责
        class ReducingSink extends SelfBox<I> implements SelfTerminalSink<T, I> {
            @Override
            public void begin(long size) {
                // state = ArrayList::new = 初始化的空集合
                state = supplier.get();
            }

            @Override
            public void accept(T t) {
                // 向集合state中添加元素t
                accumulator.accept(state, t);
            }
        }

        // 生成ReduceOp(尾部管道接口的实现类),定义尾部管道的职责(并由ReducingSink执行该职责)
        return new SelfReduceOp<T, I, ReducingSink>() {
            // ReducingSink
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }
        };
    }

    /**
     * 结果集
     */
    private static abstract class SelfBox<U> {
        U state;

        SelfBox() {
        }

        public U get() {
            return state;
        }
    }

    /**
     * 尾部管道
     */
    private static abstract class SelfReduceOp<T, R, S extends SelfTerminalSink<T, R>>
            implements SelfTerminalOp<T, R> {
        SelfReduceOp() {
        }

        // 生成尾部管道的sink,定义为抽象方法,终结操作终结结果有多种
        public abstract S makeSink();

        /**
         * 顺序评估管道
         */
        @Override
        public <P_IN> R evaluateSequential(SelfPipelineHelper<T> helper, Iterator<P_IN> iterator) {
            // 生成尾部管道的sink
            S reducingSink = makeSink();

            // 管道包装sink:从ReducingSink(4)开始,按照3,4;2,3;1,2的顺序构建sink链,按照1,2,3,4的顺序执行流转,由4收集最终结果
            S wrappedReducingSink = helper.wrapAndCopyInto(reducingSink, iterator);

            // 返回ReducingSink持有和可以supplier的集合
            return wrappedReducingSink.get();
        }
    }
}
package com.simple.boot.java_skill.selfλ;

import java.util.function.Consumer;

/**
 * 下沉:一种Consumer函数式接口,执行指定动作
 */
public interface SelfSink<T> extends Consumer<T> {
    default void begin(long size) {
    }

    default void end() {
    }

    /**
     * sink链
     */
    abstract class Chain<T, E_OUT> implements SelfSink<T> {
        // 下个sink
        protected final SelfSink<? super E_OUT> downstream;

        public Chain(SelfSink<? super E_OUT> downstream) {
            // 向下的单向链条
            this.downstream = downstream;
        }

        // 本例:单向链条传递到最后一个ReducingSink,初始化结果集合
        @Override
        public void begin(long size) {
            downstream.begin(size);
        }
    }
}
package com.simple.boot.java_skill.selfλ;

import java.util.function.Function;
import java.util.function.Predicate;

public interface SelfStream<T> {
    SelfStream<T> filter(Predicate<? super T> predicate);

    <R> SelfStream<R> map(Function<? super T, ? extends R> mapper);

    <R, A> R collect(SelfCollector<? super T, A> collector);
}
package com.simple.boot.java_skill.selfλ;

import java.util.Iterator;

public interface SelfTerminalOp<E_IN, R> {
    <P_IN> R evaluateSequential(SelfPipelineHelper<E_IN> helper, Iterator<P_IN> iterator);
}
package com.simple.boot.java_skill.selfλ;

import java.util.function.Supplier;

/**
 * 尾部管道的sink
 */
public interface SelfTerminalSink<T, R> extends SelfSink<T>, Supplier<R> {
}

 

标签:java,自定义,stream,管道,sink,new,return,SelfSink,public
来源: https://www.cnblogs.com/seeall/p/16630280.html