编程语言
首页 > 编程语言> > ☕【Java技术指南】「并发编程专题」Fork/Join框架基本使用和原理探究(原理及源码篇)

☕【Java技术指南】「并发编程专题」Fork/Join框架基本使用和原理探究(原理及源码篇)

作者:互联网

ForkJoin线程池框架回顾

ForkJoinPool的类架构图

ForkJoinPool核心类实现

ForkJoinPool,所有线程和WorkQueue共享,用于工作窃取、任务状态和工作状态同步。

核心属性介绍

ForkJoinTask

WorkQueue

workQueue: 当前线程的任务队列,与WorkQueue的owner呼应


ForkJoinTask是能够在ForkJoinPool中执行的任务抽象类,父类是Future,具体实现类有很多,这里主要关注RecursiveAction和RecursiveTask。

只需要实现其compute()方法,在compute()中做最小任务控制,任务分解(fork)和结果合并(join)。

ForkJoinWorkerThread

ForkJoinPool中执行的默认线程是ForkJoinWorkerThread,由默认工厂产生,可以自己重写要实现的工作线程。同时会将ForkJoinPool引用放在每个工作线程中,供工作窃取时使用。


源码分析

ForkJoinPool构造函数

ForkJoinPool有四个构造函数,其中参数最全的那个构造函数如下所示:

public ForkJoinPool(int parallelism,
                            ForkJoinWorkerThreadFactory factory,
                            UncaughtExceptionHandler handler,
                            boolean asyncMode)

当asyncMode设置为true的时候,队列采用先进先出方式工作;反之则是采用后进先出的方式工作,该值默认为false

......
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
......
private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }
使用案例
ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());

先看ForkJoinPool的创建过程,这个比较简单,创建了一个ForkJoinPool对象,带有默认ForkJoinWorkerThreadFactory,并行数跟机器核数一样,同步模式。

提交任务

forkJoinPool.invoke(new CountRecursiveTask(1, 100));会先执行到ForkJoinPool#externalPush中,此时forkJoinPool.workQueues并没有完成初始化工作,所以执行到ForkJoinPool#externalSubmit。

externalSubmit

这里是一个for无限循环实现,跳出逻辑全部在内部控制,主要结合runState来控制。

  1. 建ForkJoinPool的WorkQueue[]变量workQueues,长度为大于等于2倍并行数量的且是2的n次幂的数。这里对传入的并行数量使用了位运算,来计算出workQueues的长度。

  2. 创建一个WorkQueue变量q,q.base=q.top=4096,q的owner为null,无工作线程,放入workQueues数组中

  3. 创建q.array对象,长度8192,将ForkJoinTask也就是代码案例中的CountRecursiveTask放入q.array,pool为传入的ForkJoinPool,并将q.top加1,完成后q.base=4096,q.top=4097。然后执行ForkJoinPool#signalWork方法。(base下标表示用来取数据的,top下标表示用来放数据的,当base小于top时,说明有数据可以取)

externalSubmit主要完成3个小步骤工作,每个步骤都使用了锁的机制来处理并发事件,既有对runState使用ForkJoinPool的全局锁,也有对WorkQueue使用局部锁。

signalWork

signalWork方法的签名是:void signalWork(WorkQueue[] ws, WorkQueue q)。ws为ForkJoinPool中的workQueues,q为externalSubmit方法中新建的用于存放ForkJoinTask的WorkQueue.

ForkJoinWorkerThread#run
	WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
	int b, n; long c;
	//如果pool.workQueues即ws的k下标元素不为空
	if ((q = ws[k]) != null) {
		//如果base<top且array不为空,则说明有元素。为什么还需要array不为空才说明有元素?
		//从下面可以知道由于获取元素后才会设置base=base+1,所以可能出现上一个线程拿到元素了但是没有及时更新base
	    if ((n = (b = q.base) - q.top) < 0 &&
	        (a = q.array) != null) {      // non-empty
	        long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
	        //这里使用getObjectVolatile去获取当前WorkQueue的元素
	        //volatile是保证线程可见性的,也就是上一个线程可能已经拿掉了,可能已经将这个任务置为空了。
	        if ((t = ((ForkJoinTask<?>)
	                  U.getObjectVolatile(a, i))) != null &&
	            q.base == b) {
	            if (ss >= 0) {
	            		//拿到任务之后,将array中的任务用CAS的方式置为null,并将base加1
	                if (U.compareAndSwapObject(a, i, t, null)) {
	                    q.base = b + 1;
	                    if (n < -1)       // signal others
	                        signalWork(ws, q);
	                    return t;
	                }
	            }
	            else if (oldSum == 0 &&   // try to activate
	                     w.scanState < 0)
	                tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
	        }
	        if (ss < 0)                   // refresh
	            ss = w.scanState;
	        r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
	        origin = k = r & m;           // move and rescan
	        oldSum = checkSum = 0;
	        continue;
	    }
	    checkSum += b;
	}
CountRecursiveTask#compute

重写compute方法一般需要遵循这个规则来写

if(任务足够小){
  直接执行任务;
  如果有结果,return结果;
}else{
  拆分为2个子任务;
  分别执行子任务的fork方法;
  执行子任务的join方法;
  如果有结果,return合并结果;
}
public final ForkJoinTask<V> fork() {
        Thread t;
        //如果是工作线程,则往自己线程中的workQuerue中添加子任务;否则走首次添加逻辑
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

ForkJoinPool.WorkQueue#push方法会将当前子任务存放到array中,并调用ForkJoinPool#signalWork添加线程或等待其他线程去窃取任务执行。过程又回到前面讲到的signalWork流程。

ForkJoinTask#externalAwaitDone
ForkJoinTask#join

来看left.join() + right.join(),在将left和right的Task放置在当前工作线程的workQueue之后,执行join()方法,join()方法最终会在ForkJoinPool.WorkQueue#tryRemoveAndExec中将刚放入的left取出,将对应workQueue中array的left任务置为空,然后执行left任务。然后执行到left的compute方法。对于right任务也是一样,继续子任务的fork和join工作,如此循环往复。

	public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }

当工作线程执行结束后,会执行getRawResult,拿到结果。

Work-Steal算法

相比其他线程池实现,这个是ForkJoin框架中最大的亮点。当空闲线程在自己的WorkQueue没有任务可做的时候,会去遍历其他的WorkQueue,并进行任务窃取和执行,提高程序响应和性能。

取2的n次幂作为长度的实现
	//代码位于java.util.concurrent.ForkJoinPool#externalSubmit
    if ((rs & STARTED) == 0) {
        U.compareAndSwapObject(this, STEALCOUNTER, null,
                               new AtomicLong());
        // create workQueues array with size a power of two
        int p = config & SMASK; // ensure at least 2 slots
        int n = (p > 1) ? p - 1 : 1;
        n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
        n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
        workQueues = new WorkQueue[n];
        ns = STARTED;
    }

这里的p其实就是设置的并行线程数,在为ForkJoinPool创建WorkQueue[]数组时,会对传入的p进行一系列位运算,最终得到一个大于等于2p的2的n次幂的数组长度

内存屏障
	//代码位于java.util.concurrent.ForkJoinPool#externalSubmit
    if ((a != null && a.length > s + 1 - q.base) ||
        (a = q.growArray()) != null) {
        int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
        //通过Unsafe进行内存值的设置,高效,且屏蔽了处理器和Java编译器的指令乱序问题
        U.putOrderedObject(a, j, task);
        U.putOrderedInt(q, QTOP, s + 1);
        submitted = true;
    }

这里在对单个WorkQueue的array进行push任务操作时,先后使用了putOrderedObject和putOrderedInt,确保程序执行的先后顺序,同时这种直接操作内存地址的方式也会更加高效。

高并发:细粒度WorkQueue的锁

	//代码位于java.util.concurrent.ForkJoinPool#externalSubmit
	//如果qlock为0,说明当前没有其他线程操作改WorkQueue
	//尝试CAS操作,修改qlock为1,对这个WorkQueue进行加锁
    if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
        ForkJoinTask<?>[] a = q.array;
        int s = q.top;
        boolean submitted = false; // initial submission or resizing
        try {                      // locked version of push
            if ((a != null && a.length > s + 1 - q.base) ||
                (a = q.growArray()) != null) {
                int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                U.putOrderedObject(a, j, task);
                U.putOrderedInt(q, QTOP, s + 1);
                submitted = true;
            }
        } finally {
        	  //finally将qlock置为0,进行锁的释放,其他线程可以使用
            U.compareAndSwapInt(q, QLOCK, 1, 0);
        }
        if (submitted) {
            signalWork(ws, q);
            return;
        }
    }

这里对单个WorkQueue的array进行push任务操作时,使用了qlock的CAS细粒度锁,让并发只落在一个WOrkQueue中,而不是整个pool中,极大提高了程序的并发性能,类似于ConcurrentHashMap。

标签:Fork,ForkJoinPool,ForkJoinTask,任务,源码,线程,WorkQueue,原理,array
来源: https://www.cnblogs.com/liboware/p/15316269.html