编程语言
首页 > 编程语言> > AQS的源码分析

AQS的源码分析

作者:互联网

AQS

因为AQS核心方法其实是抽象方法,所以通过他的一个实现ReentrantLock来进行分析

在JDK1.5之前,一般是靠synchronized关键字来实现线程对共享变量的互斥访问。是在字节码上加指令,依赖于底层操作系统实现。直到AbstractQueuedSynchronizer(AQS)组件被开发出来,仅有原生的Java语句就实现了。也就是说AQS是Java并发中用以解决多线程访问共享资源问题的同步机制的基本的框架(或者说是一种规范),为Java并发同步组件提供统一的底层支持

特点

双向队列执行方式

基础状态

添加一个节点:

删除一个节点

核心步骤的源码分析

使用的是ReentrantLock执行流程来说明AQS加锁与解锁,其中关于公平与不公平竞争就是表示在队列中有阻塞线程时能够能进行插队。那么其中ReentrantLock就是一个典型的不公平锁的实现。

加锁

上图为ReentrantLock加锁的过程,首先调用lock()方法进行加锁。

final void lock() {
    /*
    * 根据之前所说,state的状态表明了当前是否有资源同不同意加锁,通过CAS原子操作尝试获取锁
    * 当无竞争时(state==0)在获取锁后就会将state改为1,然后将当前线程的执行者位设置为当前
    * 线程。类似于Monitor的Owner的屋
    */
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        /*
        * 如果已经出现冲突,且未拿到锁对象,就需要调用此方法尝试进行获取锁。
        * 参数1代表每次锁重入时state累计增加的值。当一个线程多次获取锁
        * 则state变为n 
        */
        acquire(1);
}

此时如果调用了acquire方法说明你第一次并没有获取到锁,这个时候就需要去尝试获取锁。此时ReentrantLock将尝试获取锁的步骤分为2步。再次尝试获取锁,等待阻塞队列释放。

public final void acquire(int arg) {
	//再次尝试获取锁
    if (!tryAcquire(arg) &&
        //等待阻塞队列释放
        acquireQueued(
            //添加到等待队列
            addWaiter(Node.EXCLUSIVE), arg)
       )
        //------------
        selfInterrupt();
}

一步一步来看,首先是查看再次尝试获取锁的源码。

因为我们使用的ReentrantLock来查看,所以这里tryAcquire的实现在ReentrantLock使用的nonfairTryAcquire方法

final boolean nonfairTryAcquire(int acquires) {
    //获取当前执行的线程
    final Thread current = Thread.currentThread();
    /*
    * 判断执行到这一步时,是否可以获取锁
    * 此处也体现ReentrantLock不公平锁的特点,如果此时队列有线程阻塞
    * 当前线程未加入队列,且state=0此处就可以进行插队。
    */
    int c = getState();
    if (c == 0) {
        //如果cas也成功,插队成功,当前线程获取执行权
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            //此时返回true,将不在执行后续操作
            return true;
        }
    }
    /*
    * 如果当前锁是被占用的,就判断拥有锁对象的线程与当前线程是否是同一个
    * 是同一个进行锁重入,不是那么就要进入队列开始安心排队了
    */
    else if (current == getExclusiveOwnerThread()) {
        //对state状态进行一个累加,到时候释放也是递减。直到state为0后真正释放锁
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

此时如果再次尝试获取锁还是没有成功,那么就要开始去排队去了。首先通过addWaiter将线程包装层一个Node节点。

因为addWaiter代码不是重点,就直接讲流程。注意:初始化的队列头尾是一个所有变量为NULL的Node对象

graph LR A(创建Node节点) B(获取当前的尾节点) C(通过CAS操作将新节点添加到队列尾部) D(调用Enq方法初始化队列) F(队列是否初始化) G(CAS初始化队列) H(CAS合并新阶点) RE(返回新节点) A-->B B--队列已经初始化-->C B--队列未被初始化-->D D-->F F--尾节点为空-->G F--尾节点不为空-->H G--循环-->F C-->RE H-->RE

此时新节点已经被创建出来了,此时就要拿着当前的节点进行无期限的重试模式。

    final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //开始无期限重试
        for (;;) {
            /*
            * 首先获取当前节点的前驱节点,因为队列的模型为FIFO,那么下一个出队列
            * 的节点必须是第一个元素。【非头,而是头下一个】
            */
            final Node p = node.predecessor();
            /*
            * 如果当前节点是队列第一个元素,可以尝试再去尝试获取锁。如果获取不到就阻塞
            */
            if (p == head && tryAcquire(arg)) {
                /*
                * 此时为了不必要的一些操作外,以及方便GC的垃圾回收
                * 除了会设置将当前节点设置为头结点外还会将该节点的所有参数却不置为空
                */
                setHead(node);
                p.next = null; // help GC
                //当前操作已经成功,没有失败。
                failed = false;
                return interrupted;
            }
            //如果不是第一个节点,或没有获取锁就执行这个方法进行阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            //如果发生异常就会执行这一步
            cancelAcquire(node);
    }
}

阻塞分别是shouldParkAfterFailedAcquireparkAndCheckInterrupt,那么继续一步一步来,但是需要先查看parkAndCheckInterrupt()方法。当shouldParkAfterFailedAcquire()方法返回为TRUE时则会执行parkAndCheckInterrupt()方法。

private final boolean parkAndCheckInterrupt() {
    /*
    * 当之前前面那个方法成功时,就会将当前的线程进行阻塞。这也契合了阻塞队列的节点需要
    * 阻塞的观点
    */
    LockSupport.park(this);
    /*
    * 这里是获取当前线程的打断标记。如果返回True那么在if语句中就会返回true,在回到
    * acquire方法的if语句中,就会打算自身让线程唤醒。这里应该是需要通过调用者手动
    * 修改打断标记,但是仅仅是可能。这里只是猜想
    */
    return Thread.interrupted();
}

当知道了shouldParkAfterFailedAcquire()方法其实就是用来阻塞当前线程时,看前面一个判断的操作就会很明白。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //这里就用到了之前说的waitStatus参数,判断当前节点的前驱节点的waitStatus
    int ws = pred.waitStatus;
    /*
    * AQS唤醒操作时通过一个队列中的节点的释放去唤醒下一个节点
    * 如果waitStatus等于-1即Node.SIGNAL状态,表示这个节点可以唤醒一个节点
    * 那么这里就需要判断在当前线程阻塞时判断前驱节点能不能唤醒我们,不能需要修改前驱节点标记
    */
    if (ws == Node.SIGNAL)
        return true;
    /*
    * 下面的操作就是修改前驱节点的waitstates为-1,但是可能有些节点已经废弃了。需要先
    * 将废弃的踢出,然后在修改最近的有效的节点的waitstates
    */
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
       compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

解锁

此时已经了解加锁的全部过程,简单来说就是一个通过state来声明此时有没有占用,通过CAS操作保证一个原子性操作。然后阻塞的队列都放进队列中等着,等着被唤醒。了解了这些那么解锁的过程其实也变得相对简单而容易。

首先通过调用ReentranLockunlock()方法进行解锁。

public final boolean release(int arg) {
    //尝试解锁,如果当前被锁重入了,这里解锁就会返回false。知道解最后一层锁
    if (tryRelease(arg)) {
        //当解锁后需要判断队列是否有阻塞的线程,有需要通过下面的步骤进行唤醒
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

首先看看重入锁的解刨过程

protected final boolean tryRelease(int releases) {
    //这一步非常简单,就是递减判断state是否为0,是就解锁,不是就不解锁
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

如果在tryRelease()已经释放锁,就需要尝试将下一个队列的元素进行释放

private void unparkSuccessor(Node node) {
    //因为传进来的是头结点,此时需要严谨,必须修改头结点waitstate的状态才能继续
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
	//这里不能head.next获取第一个元素是因为之前帮助GC回收的时候将next设置为了null
    //需要逆序查找
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //获取到了第一个元素直接唤醒即可。
    if (s != null)
        LockSupport.unpark(s.thread);
}

错误处理

最后补充一个错误处理,即加锁过程中acquireQueued()方法在变量failedtrue会执行的操作。当遇到异常在无法让failed改变的时候就会用调用这个错误处理的过程。

private void cancelAcquire(Node node) {
    //首先这个节点为空,直接返回
    if (node == null)
        return;
	//为了防止存储在参数中的线程在别处调用而影响,先置空再说
    node.thread = null;

    /*
    * 这里先让前面的废弃节点舍弃,让当前节点先直接连上最近一个有用的节点。
    * 这一步是有用的,他需要将没有的废弃然后才好拼接
    */
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    
    Node predNext = pred.next;

    //这里设置成废弃,用处和之前那个一样,不要影响其他线程的调用。方便直接跳过
    node.waitStatus = Node.CANCELLED;

    /*
    *	这里有3种可能:
    *		1、该节点是尾节点,pre->null
    *		2、该节点是头结点, 释放下一个节点。头结点表示当前线程获取锁
    *		3、该节点pre不是头结点,当也不是尾节点。需要保证pre的WaitStatus状态时-1,
    *			保证能够之后唤醒next,然后就是一个pre->next操作
    */
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

尝试与编码

通过对大哥李(Doug Lea)的AQS基本的了解,那么也要尝试编写一个简单的的ReentrantLock来巩固对源码的理解。我就是想简单的加锁解锁的功能,不在尝试锁重入和waitstates的实现

/**
 * @author musiro
 * @since 2022/7/16 12:28
 */
@Data
final class MyAbstractQueuedSynchronizer{

    final class Node {

        volatile Node prev;

        volatile Node next;

        volatile Thread thread;

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null) {
                throw new NullPointerException();
            }
            else {
                return p;
            }
        }

        public Node(Thread thread) {
            this.thread = thread;
        }

        /**
         * 大哥李的思想,帮助GC垃圾回收
         */
        public void helpGC(){
            prev = null;
            thread = null;
            next = null;
        }
    }

    private volatile int state;

    private volatile Node head;

    private volatile Node tail;

    private Thread owner;

    public MyAbstractQueuedSynchronizer() {

    }

    public void acquire(){
        if(!tryAcquire()){
            acquireQueued(addWaiter());
        }
    }

    public void release(){
        //因为没有锁重入操作,所以不需要判断state的值,可以直接释放锁。
        //注意执行到这一步必定是已经获取了锁
        setState(0);
        setOwner(null);
        //判断当前的线程是否为队列中的节点,有就释放
        Node head = getHead();
        if(head != null && head != tail){
            Node temp = getTail();
            while (temp.predecessor() != head){
                temp = temp.predecessor();
            }
            LockSupport.unpark(temp.thread);
        }
    }

    private boolean tryAcquire(){
        //获取当前线程
        Thread thread = Thread.currentThread();
        //先判断state的状态
        int v = getState();
        //判断能不能进行加锁
        if(v == 0 && unsafe.compareAndSwapInt(this,stateOffset,0,1)){
            setOwner(Thread.currentThread());
            return true;
        }
        //锁重入操作就不编写了
        return false;
    }

    private void acquireQueued(Node node){
        //尝试加入队列
        while (true){
            final Node pre = node.predecessor();
            //判断是否是第一个元素,是的话就要轮询获取锁
            if(pre == head && tryAcquire()){
                //获取了锁开始释放,根据大哥李建议是帮助GC释放
                //注意这里是不需要让队列的下一个进行开始激活轮询获取锁的,等待这个线程激活的时候再开始
                //会更加适合
                node.helpGC();
                setHead(node);
                //释放,让他执行线程后续逻辑
                return;
            }
            //如果不是最后一个,就需要阻塞,根据AQS的思想需要将waitStates设置为-1,且需要释放一些
            //废的线程,但是因为是简单重写这里都不进行尝试编写。
            LockSupport.park();
        }
    }

    private Node addWaiter(){
        Node node = new Node(Thread.currentThread());
        Node prev = null;
        //判断是否初始化
        while ((prev = getTail()) == null){
            //创建一个空节点充当头和尾
            if (unsafe.compareAndSwapObject(this,headOffset,null,new Node(null))){
                tail = head;
                break;
            }
        }
        //将尾巴和新尾巴进行拼接
        node.prev = prev;
        while (true){
            if (unsafe.compareAndSwapObject(this, tailOffset, prev, node)) {
                node.prev = prev;
                prev.next = node;
                return node;
            }
            prev = getTail();
        }
    }

    private static final Unsafe unsafe;
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    static {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            unsafe = (Unsafe) field.get(null);
            stateOffset = unsafe.objectFieldOffset(MyAbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset(MyAbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset(MyAbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        } catch (NoSuchFieldException | IllegalAccessException e) {
            throw new RuntimeException("unsafe init fail");
        }
    }

}

最后验证一波发现能够完美实现阻塞以及及时的唤醒操作

public static void main(String[] args) {
        MyAbstractQueuedSynchronizer aqs = new MyAbstractQueuedSynchronizer();
        new Thread(()->{
            log.info("线程1执行");
            BaseUtil.sleep(3);
            aqs.acquire();
            try {
                log.info("线程1结束");
            }finally {
                aqs.release();
            }
        },"线程1").start();

        new Thread(()->{
            aqs.acquire();
            try {
                log.info("线程2执行");
                BaseUtil.sleep(2);
                log.info("线程2结束");
            }finally {
                aqs.release();
            }
        },"线程2").start();

        new Thread(()->{
            aqs.acquire();
            try {
                log.info("线程3执行");
                BaseUtil.sleep(2);
                log.info("线程3结束");
            }finally {
                aqs.release();
            }
        },"线程3").start();
    }
/*
*  结果:
2022-07-16 16:45:57 [线程2]-----线程2执行
2022-07-16 16:45:57 [线程1]-----线程1执行
2022-07-16 16:45:59 [线程2]-----线程2结束
2022-07-16 16:45:59 [线程3]-----线程3执行
2022-07-16 16:46:01 [线程3]-----线程3结束
2022-07-16 16:46:01 [线程1]-----线程1结束
*/

结束语

初次学习难免有误,如果出错还请大佬点出,我会及时更正。
参考文献也是我很推荐的,不然小白我也写不出这篇博客

参考文献

(九)深入分析AQS实现原理

标签:分析,Node,node,AQS,队列,源码,线程,null,节点
来源: https://www.cnblogs.com/musiro/p/16484595.html