编程语言
首页 > 编程语言> > Java8 Stream源码精讲(二):Stream创建原理深度解析

Java8 Stream源码精讲(二):Stream创建原理深度解析

作者:互联网

简介

Java8 Stream源码精讲(一):从一个简单的例子入手

上一篇文章,通过分析一个使用Stream操作数据的例子,讲解了构建Stream,经过中间操作map()和filter()方法调用返回一个ReferencePipeline链表,调用终止操作forEach()将声明的函数构造成为一个sink链表,最终每一个元素都会被传入Sink#accept()方法处理。本章将通过重点分析创建Stream的源码,了解Stream的构建过程。

Spliterator是什么

在分析Stream构建之前,需要填一下上一章的坑,还记得吗,在上一章分析Stream流程的时候,构建Stream传入了一个Spliterator对象,当时只是说它是一个类似迭代器一样的东西。

public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) { 
    return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false); 
}
复制代码

现在我们来仔细看看这个接口是做什么的,先来看接口定义,省略了接口中的常量和内部接口,只留下了方法定义:

public interface Spliterator<T> {
//用于遍历单个元素,action是Stream调用终止操作之后包装的sink链
boolean tryAdvance(Consumer&lt;? super T&gt; action);

//用于批量遍历元素,action是Stream调用终止操作之后包装的sink链
default void forEachRemaining(Consumer&lt;? super T&gt; action) {
    do { } while (tryAdvance(action));
}

//并行计算的时候拆分Spliterator
Spliterator&lt;T&gt; trySplit();

//预估元素的大小
long estimateSize();

//精确获取元素的大小
default long getExactSizeIfKnown() {
    return (characteristics() &amp; SIZED) == 0 ? -1L : estimateSize();
}

int characteristics();

default boolean hasCharacteristics(int characteristics) {
    return (characteristics() &amp; characteristics) == characteristics;
}

//获取元素比较器
default Comparator&lt;? super T&gt; getComparator() {
    throw new IllegalStateException();
}

}
复制代码

Spliterator翻译成中文是分离器或者拆分器,接口注释大概的意思是:Spliterator是一个用于遍历和划分源元素的对象,源元素可以来自一个数组、Collection集合、IO Channel或者一个生成器函数。可以通过tryAdvance()方法来遍历源中的单个元素,也可以通过forEachRemaining()方法批量遍历源中的元素。在并行计算中,可以通过trySplit()方法将源中的一些元素拆分为另外的Spliterator。

从注释我们知道Spliterator的主要作用:

现在我们主要来看看方法:

如何实现一个Spliterator

对Spliterator有一个详细的认识之后,我们来看一看如何实现一个Spliterator。

T8M{NF(4Z@3R0@`MB%C395F.png

哇,好多实现类,第一时间是不是比较懵逼,这么多子类不知道从何下手?不过从图中可以看出,Spliterator的子类基本上都是某一个集合的内部类,所以我打算选择两个常用子类详细讲解,大家也可以根据这种思路分析其它的。

ArraySpliterator

ArraySpliterator是Spliterators的一个内部类,每次通过一个数组构建Stream时,都会创建相应的ArraySpliterator对象。Arrays#stream()方法的调用流程:

public static <T> Stream<T> stream(T[] array) {
    return stream(array, 0, array.length);
}

public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
}

public static <T> Spliterator<T> spliterator(T[] array, int startInclusive, int endExclusive) {
return Spliterators.spliterator(array, startInclusive, endExclusive,
Spliterator.ORDERED | Spliterator.IMMUTABLE);
}
复制代码

Spliterators#spliterator()工厂方法用于创建ArraySpliterator对象:

public static <T> Spliterator<T> spliterator(Object[] array, int fromIndex, int toIndex,
                                             int additionalCharacteristics) {
    checkFromToBounds(Objects.requireNonNull(array).length, fromIndex, toIndex);
    //返回ArraySpliterator对象
    return new ArraySpliterator<>(array, fromIndex, toIndex, additionalCharacteristics);
}
复制代码

ArraySpliterator字段分析:

//源数组
private final Object[] array;
//当前索引
private int index;        // current index, modified on advance/split
//终止索引
private final int fence;  // one past last index
private final int characteristics;
复制代码

ArraySpliterator构造函数

public ArraySpliterator(Object[] array, int additionalCharacteristics) {
    this(array, 0, array.length, additionalCharacteristics);
}

public ArraySpliterator(Object[] array, int origin, int fence, int additionalCharacteristics) {
this.array = array;
this.index = origin;
this.fence = fence;
//这里要重点关注一下ArraySpliterator源元素大小是确定的
this.characteristics = additionalCharacteristics | Spliterator.SIZED | Spliterator.SUBSIZED;
}
复制代码

通过构造函数可以看出,默认情况下没有指明origin和fence时,就是从0开始,到数组尾部结束,数组中的元素都算作流元素。

ArraySpliterator方法分析

源码比较简单,直接在代码上用注释说明了,不再单独讲解。

public void forEachRemaining(Consumer<? super T> action) {
    Object[] a; int i, hi; // hoist accesses and checks from loop
    //判空
    if (action == null)
        throw new NullPointerException();
    //数组长度大于等于fence变量
    //index变量大于等于0
    //修改index变量且当前下标小于hi
    if ((a = array).length >= (hi = fence) &&
        (i = index) >= 0 && i < (index = hi)) {
        //循环消费数组元素
        do { action.accept((T)a[i]); } while (++i < hi);
    }
}
复制代码
public boolean tryAdvance(Consumer<? super T> action) {
    //判空
    if (action == null)
        throw new NullPointerException();
    //当前下标大于等于0且小于fence
    //数组中才有剩余可访问的元素
    if (index >= 0 && index < fence) {
        //取元素,index自增
        @SuppressWarnings("unchecked") T e = (T) array[index++];
        //消费元素
        action.accept(e);
        return true;
    }
    //返回false代表没有源元素了
    return false;
}
复制代码
//就是fence减去index,代表数组中剩余的元素大小
public long estimateSize() { return (long)(fence - index); }
复制代码

ArrayListSpliterator

ArrayListSpliterator是ArrayList的内部类,调用ArrayList#stream()方法时会创建这样一个对象。

ArrayList继承自Collection的stream()方法:

default Stream<E> stream() {
    return StreamSupport.stream(spliterator(), false);
}
复制代码

重写了spliterator()方法,这个方法会返回一个Spliterator对象,可以看到创建的就是ArrayListSpliterator,同时传入了ArrayList自身:

public Spliterator<E> spliterator() {
    return new ArrayListSpliterator<>(this, 0, -1, 0);
}
复制代码

那我们来着重研究下ArrayListSpliterator的源码。

ArrayListSpliterator字段分析

//源元素集合
private final ArrayList<E> list;
//当前下标
private int index; // current index, modified on advance/split
//结束下标
private int fence; // -1 until used; then one past last index
private int expectedModCount; // initialized when fence set
复制代码

ArrayListSpliterator方法分析

public void forEachRemaining(Consumer<? super E> action) {
    int i, hi, mc; // hoist accesses and checks from loop
    ArrayList<E> lst; Object[] a;
    //判空
    if (action == null)
        throw new NullPointerException();
    //ArrayList不为空且其中存放元素的数组不能为空,很容易理解
    if ((lst = list) != null && (a = lst.elementData) != null) {
        //一般来说进入这个分支,因为创建ArrayListSpliterator时,fence是-1
        if ((hi = fence) < 0) {
            mc = lst.modCount;
            //hi就是ArrayList的元素大小
            hi = lst.size;
        }
        else
            mc = expectedModCount;
        //修改index变量
        if ((i = index) >= 0 && (index = hi) <= a.length) {
            for (; i < hi; ++i) {
                //遍历list中的元素数组,取依次取上面的元素,调用action消费
                @SuppressWarnings("unchecked") E e = (E) a[i];
                action.accept(e);
            }
            //校验modCount,不允许方法执行时内部结构发生改变
            if (lst.modCount == mc)
                return;
        }
    }
    throw new ConcurrentModificationException();
}
复制代码
public boolean tryAdvance(Consumer<? super E> action) {
    //判空
    if (action == null)
        throw new NullPointerException();
    //这里hi其实就是ArrayList元素大小
    int hi = getFence(), i = index;
    if (i < hi) {
        index = i + 1;
        //取数组中index下标的元素
        @SuppressWarnings("unchecked") E e = (E)list.elementData[i];
        //调用action消费元素
        action.accept(e);
        //并发修改校验
        if (list.modCount != expectedModCount)
            throw new ConcurrentModificationException();
        return true;
    }
    //没有剩余的元素,返回false
    return false;
}
复制代码

tryAdvance()中是通过将getFence()的返回值赋值给hi的,进入这个方法看下:

private int getFence() { // initialize fence to size on first use
    int hi; // (a specialized variant appears in method forEach)
    ArrayList<E> lst;
    //首次被调用时会进入,将ArrayList.size赋值给fence,ArrayList.modCount赋值给expectedModCount,
    //其它时候直接返回fence的值
    if ((hi = fence) < 0) {
        if ((lst = list) == null)
            hi = fence = 0;
        else {
            expectedModCount = lst.modCount;
            hi = fence = lst.size;
        }
    }
    return hi;
}
复制代码
//估计值大小,ArrayList也是元素大小确定的,计算逻辑是ArrayList.size减去当前下标,表示还有多少源元素
public long estimateSize() {
    return (long) (getFence() - index);
}
复制代码

根据Spliterator创建Stream

通过前面分析我们知道Stream都是通过StreamSupport这个工具类创建的,传入的Spliterator参数就是上面讲解的Spliterator实现类实例:

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
    Objects.requireNonNull(spliterator);
    return new ReferencePipeline.Head<>(spliterator,
                                        StreamOpFlag.fromCharacteristics(spliterator),
                                        parallel);
}
复制代码

返回的实际上是一个ReferencePipeline.Head对象,我在上一个章节中有详细的讲解,现在我们再来分析一下加深印象。先看一下它的类继承关系:

Head.png Head是ReferencePipeline的内部类,同时又继承了ReferencePipeline,ReferencePipeline是引用类型Stream的抽象,它实现了Stream接口,拥有中间操作和终止操作的能力,ReferencePipeline也继承了AbstractPipeline,这个抽象类上一章节有详细讲解。

Head的构造函数调用ReferencePipeline构造函数:

Head(Spliterator<?> source,
     int sourceFlags, boolean parallel) {
    super(source, sourceFlags, parallel);
}

ReferencePipeline(Spliterator<?> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
复制代码

最终调用AbstractPipeline构造函数,上一章也有详细讲解,这里再回顾一下:

AbstractPipeline(Spliterator<?> source,
                 int sourceFlags, boolean parallel) {
    //头结点没有前一个节点了
    this.previousStage = null;
    //代表源元素的Spliterator
    this.sourceSpliterator = source;
    //sourceStage变量指向自己
    this.sourceStage = this;
    this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
    // The following is an optimization of:
    // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
    this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
    //深度为0
    this.depth = 0;
    //标识是不是并行流,默认为false
    this.parallel = parallel;
}
复制代码

总结一下,通过StreamSupport#stream()传入Spliterator创建Stream,实际上返回的都是ReferencePipeline.Head对象,它代表源阶段的Stream,也是Pipeline链表的头结点。

Spliterator调用时机

前面分析了Spliterator和创建Stream的过程之后,还有一个疑问:Spliterator是在什么地方被使用到的呢?

先说结论:Spliterator是在Stream调用终止操作的时候触发它的方法调用。其实这很容易理解,因为Stream是惰性流,创建和中间操作的时候什么都不会做,只有终止操作时才调用声明的处理数据的lambda表达式。

看过上一章节的小伙伴应该还记得终止操作都会调用AbstractPipeline#evaluate()方法:

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;
return isParallel()
       ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
       : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));

}
复制代码

经过一系列调用,进入AbstractPipeline#copyInto()方法:

final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);
//非短路操作
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {

    //调用spliterato#getExactSizeIfKnown()获取源元素精确大小,然后传递给sink链表的begin()方法
  wrappedSink.begin(spliterator.getExactSizeIfKnown());
    //调用spliterator#forEachRemaining()批量遍历源元素
    spliterator.forEachRemaining(wrappedSink);
    wrappedSink.end();
}
//短路操作
else {
    copyIntoWithCancel(wrappedSink, spliterator);
}

}
复制代码

终止操作是非短路操作的,在copyInto()中调用Spliterato#getExactSizeIfKnown()方法,会间接调用Spliterato#estimateSize(),然后调用Spliterato#forEachRemaining()批量遍历源元素。

终止操作是短路操作的,会再调用AbstractPipeline#copyIntoWithCancel()方法:

final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    @SuppressWarnings({"rawtypes","unchecked"})
    AbstractPipeline p = AbstractPipeline.this;
    //取Pipeline链表头节点
    while (p.depth > 0) {
        p = p.previousStage;
    }
    wrappedSink.begin(spliterator.getExactSizeIfKnown());
    p.forEachWithCancel(spliterator, wrappedSink);
    wrappedSink.end();
}
复制代码

ReferencePipeline#forEachWithCancel()方法:

final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}
复制代码

短路操作在do-while循环中调用tryAdvance()方法遍历Spliterator的单个源元素。

总结

本文首先介绍了Spliterator的含义,详细讲解了每一个方法的作用,然后通过ArraySpliterator和ArrayListSpliterator分析如何实现一个Spliterator接口,再次回顾了代表源Stream的Head类,最后分析了Spliterator方法的调用时机。

写在最后

Stream的数据处理逻辑都是通过lambda表达式定义的,它是一种声名式编程,与命令式编程不同。命令式编程比如传统的集合遍历迭代很好调试,而Stream很难调试,出了问题很多时候无从下手。通过阅读源码,了解原理,可以帮助我们更容易调试代码,定位问题。

本系列文章是以专栏的形式发布的,上下文多有关联,如果跳着阅读,可能会产生不连贯的感觉,所以建议按照顺序阅读。另外由于是源码分析,跟其他类型文章不同,所以建议在阅读的时候跟着思路亲自动手调试源码。

最后,原创不易,如果觉得本系列文章对您有帮助,能够加深您对Stream原理和源码的理解的话,请不要吝啬您手中的赞(✪ω✪)!

来源:https://juejin.cn/post/7101217470542774308

标签:index,fence,Stream,int,精讲,元素,Spliterator,源码
来源: https://www.cnblogs.com/konglxblog/p/16476003.html