其他分享
首页 > 其他分享> > JUC学习之共享模型工具之JUC并发工具包上

JUC学习之共享模型工具之JUC并发工具包上

作者:互联网

JUC学习之共享模型工具之JUC并发工具包


AQS 原理

概述

全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架

特点:

getState - 获取 state 状态

setState - 设置 state 状态

compareAndSetState - cas 机制设置 state 状态

独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源


子类主要实现这样一些方法(默认抛出 UnsupportedOperationException

获取锁的姿势

// 如果获取锁失败
if (!tryAcquire(arg)) {
 // 入队, 可以选择阻塞当前线程 park unpark
}

释放锁的姿势

// 如果释放锁成功
if (tryRelease(arg)) {
 // 让阻塞线程恢复运行
}

实现不可重入锁

自定义同步器

package schedule;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;

//自定义锁(不可重入锁)
final class MySync extends AbstractQueuedSynchronizer {

   /**
   @param acquires 可重入数用来计数的,因为是不可重入锁,因此如果这里acquires的值大于一就返回false,表示加锁失败
    */
    @Override
    protected boolean tryAcquire(int acquires) {
        if (acquires == 1) {
            //尝试加锁
            if (compareAndSetState(0, 1)) {
                //加上了锁,并设置owner为当前线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
        }
        //加锁失败
        return false;
    }

    /**
     @param acquires 可重入数用来计数的,因为是不可重入锁,因此如果这里acquires的值大于一就返回false,表示解锁失败
     */
    @Override
    protected boolean tryRelease(int acquires) {
        if (acquires == 1) {
            //当前为没有上锁的状态
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            //设置当前没有线程占用锁
            setExclusiveOwnerThread(null);
            //解锁--- private volatile int state
            //因为state是volatile,因此将setState(0);方法执行放在setExclusiveOwnerThread(null);方法执行之前可以确保
            //不会产生指令重排; 确保线程可见性;----exclusiveOwnerThread不是volatile
            setState(0);
            return true;
        }
        return false;
    }

    //条件变量
    protected Condition newCondition() {
        return new ConditionObject();
    }

    //是否持有独占锁--1:持有; 0不持有
    @Override
    protected boolean isHeldExclusively() {
        return getState() == 1;
    }
}

同步器类中的大部分方法由其父类提供


自定义锁

有了自定义同步器,很容易复用 AQS ,实现一个功能完备的自定义锁

package schedule;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

class MyLock implements Lock {
 static MySync sync = new MySync();

 @Override
 // 尝试,不成功,进入等待队列
 public void lock() {
  sync.acquire(1);
 }

 @Override
 // 尝试,不成功,进入等待队列,可打断
 public void lockInterruptibly() throws InterruptedException {
  sync.acquireInterruptibly(1);
 }

 @Override
 // 尝试一次,不成功返回,不进入队列
 public boolean tryLock() {
  return sync.tryAcquire(1);
 }

 @Override
 // 尝试,不成功,进入等待队列,有时限
 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  return sync.tryAcquireNanos(1, unit.toNanos(time));
 }

 @Override
 // 释放锁
 public void unlock() {
  sync.release(1);
 }

 @Override
 // 生成条件变量
 public Condition newCondition() {
  return sync.newCondition();
 }
}

release和tryRelease的区别:

在这里插入图片描述


测试一下

       MyLock lock = new MyLock();
        new Thread(() -> {
            lock.lock();
            try {
                log.debug("locking...");
                sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                log.debug("unlocking...");
                lock.unlock();
            }
        }, "t1").start();

        new Thread(() -> {
            lock.lock();
            try {
                log.debug("locking...");
            } finally {
                log.debug("unlocking...");
                lock.unlock();
            }
        }, "t2").start();

输出

22:29:28.727 c.TestAqs [t1] - locking... 
22:29:29.732 c.TestAqs [t1] - unlocking... 
22:29:29.732 c.TestAqs [t2] - locking... 
22:29:29.732 c.TestAqs [t2] - unlocking...

不可重入测试

如果改为下面代码,会发现自己也会被挡住(只会打印一次 locking)

lock.lock();
log.debug("locking...");
lock.lock();
log.debug("locking...");

心得

起源

早期程序员会自己通过一种同步器去实现另一种相近的同步器,例如用可重入锁去实现信号量,或反之。这显然不够优雅,于是在 JSR166(java 规范提案)中创建了 AQS,提供了这种通用的同步器机制。

目标

AQS 要实现的功能目标

要实现的性能目标

Instead, the primary performance goal here is scalability: topredictably maintain efficiency even, or especially, when
synchronizers are contended.


设计

AQS 的基本思想其实很简单

获取锁的逻辑

while(state 状态不允许获取) {
 if(队列中还没有此线程) {
 入队并阻塞
 }
}
当前线程出队

释放锁的逻辑

if(state 状态允许了) {
 恢复阻塞的线程(s) }

要点


1) state 设计


2) 阻塞恢复设计


3) 队列设计

使用了 FIFO 先入先出队列,并不支持优先级队列

设计时借鉴了 CLH 队列,它是一种单向无锁队列

在这里插入图片描述
队列中有 head 和 tail 两个指针节点,都用 volatile 修饰配合 cas 使用,每个节点有 state 维护节点状态

入队伪代码,只需要考虑 tail 赋值的原子性

do {
 // 原来的 tail
 Node prev = tail;
 // 用 cas 在原来 tail 的基础上改为 node
} while(tail.compareAndSet(prev, node))

出队伪代码

// prev 是上一个节点
while((Node prev=node.prev).state != 唤醒状态) {
}
// 设置头节点
head = node;

CLH 好处:

AQS 在一些方面改进了 CLH

        private Node enq ( final Node node){
            for (; ; ) {
                Node t = tail;
                // 队列中还没有元素 tail 为 null
                if (t == null) {
                    // 将 head 从 null -> dummy
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    // 将 node 的 prev 设置为原来的 tail
                    node.prev = t;
                    // 将 tail 从原来的 tail 设置为 node
                    if (compareAndSetTail(t, node)) {
                        // 原来 tail 的 next 设置为 node
                        t.next = node;
                        return t;
                    }
                }
            }
        }

主要用到 AQS 的并发工具类

在这里插入图片描述


ReentrantLock 原理

在这里插入图片描述


非公平锁实现原理

加锁流程

先从构造器开始看,默认为非公平锁实现

public ReentrantLock() {
 sync = new NonfairSync();
}

NonfairSync 继承自 AQS

没有竞争,加锁成功的情况:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


第一个竞争出现时

在这里插入图片描述

Thread-1 执行了

  1. CAS 尝试将 state 由 0 改为 1,结果失败
  2. 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败
  3. 接下来进入 addWaiter 逻辑,构造 Node 队列

在这里插入图片描述
当前线程进入 acquireQueued 逻辑
4. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
5. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
6. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false

在这里插入图片描述
7. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时
state 仍为 1,失败

8.当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回
true

9.进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)

在这里插入图片描述
再次有多个线程经历上述过程竞争失败,变成这个样子

在这里插入图片描述


解锁流程

Thread-0 释放锁,进入 tryRelease 流程,如果成功

在这里插入图片描述

在这里插入图片描述
如果加锁成功(没有竞争),会设置

如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了

在这里插入图片描述

如果不巧又被 Thread-4 占了先


加锁流程源码分析:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

加锁源码

    // Sync 继承自 AQS
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        // 加锁实现
        final void lock() {
            // 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁
            if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread());
            else
                // 如果尝试失败,进入 ㈠
                acquire(1);
        }

        // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
        public final void acquire(int arg) {
            // ㈡ tryAcquire 
            if (!tryAcquire(arg) &&
                    // 当 tryAcquire 返回为 false 时, 先调用 addWaiter ㈣, 接着 acquireQueued ㈤
                    acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
                selfInterrupt();
            }
        }

        // ㈡ 进入 ㈢
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

        // ㈢ Sync 继承过来的方法, 方便阅读, 放在此处
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // 如果还没有获得锁
            if (c == 0) {
                // 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
            else if (current == getExclusiveOwnerThread()) {
                // state++
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            // 获取失败, 回到调用处
            return false;
        }

        // ㈣ AQS 继承过来的方法, 方便阅读, 放在此处
        private Node addWaiter(Node mode) {
            // 将当前线程关联到一个 Node 对象上, 模式为独占模式
            Node node = new Node(Thread.currentThread(), mode);
            // 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    // 双向链表
                    pred.next = node;
                    return node;
                }
            }
            // 尝试将 Node 加入 AQS, 进入 ㈥
            enq(node);
            return node;
        }

        // ㈥ AQS 继承过来的方法, 方便阅读, 放在此处
        private Node enq(final Node node) {
            for (; ; ) {
                Node t = tail;
                if (t == null) {
                    // 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)
                    if (compareAndSetHead(new Node())) {
                        tail = head;
                    }
                } else {
                    // cas 尝试将 Node 对象加入 AQS 队列尾部
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

        // ㈤ AQS 继承过来的方法, 方便阅读, 放在此处
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (; ; ) {
                    final Node p = node.predecessor();
                    // 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
                    if (p == head && tryAcquire(arg)) {
                        // 获取成功, 设置自己(当前线程对应的 node)为 head
                        setHead(node);
                        // 上一个节点 help GC
                        p.next = null;
                        failed = false;
                        // 返回中断标记 false
                        return interrupted;
                    }
                    if (
                        // 判断是否应当 park, 进入 ㈦
                            shouldParkAfterFailedAcquire(p, node) &&
                                    // park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧
                                    parkAndCheckInterrupt()) {
                        interrupted = true;
                    }
                }
            } finally {
                if (failed) cancelAcquire(node);
            }
        }

        // ㈦ AQS 继承过来的方法, 方便阅读, 放在此处
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            // 获取上一个节点的状态
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL) {
                // 上一个节点都在阻塞, 那么自己也阻塞好了
                return true;
            }
            // > 0 表示取消状态
            if (ws > 0) {
                // 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                // 这次还没有阻塞
                // 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }

        // ㈧ 阻塞当前线程
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    }

注意

是否需要 unpark 是由当前节点的前驱节点的 waitStatus == Node.SIGNAL 来决定,而不是本节点的 waitStatus 决定


解锁流程

在这里插入图片描述
解锁不区分公平和非公平

在这里插入图片描述
被唤醒的线程,恢复到最初被park阻塞的位置:

在这里插入图片描述

源码:

    // Sync 继承自 AQS
    static final class NonfairSync extends Sync {
        // 解锁实现
        public void unlock() {
            sync.release(1);
        }

        // AQS 继承过来的方法, 方便阅读, 放在此处
        public final boolean release(int arg) {
            // 尝试释放锁, 进入 ㈠
            if (tryRelease(arg)) {
                // 队列头节点 unpark
                Node h = head;
                if (
                    // 队列不为 null
                        h != null &&
                                // waitStatus == Node.SIGNAL 才需要 unpark
                                h.waitStatus != 0) {
                    // unpark AQS 中等待的线程, 进入 ㈡
                    unparkSuccessor(h);
                }
                return true;
            }
            return false;
        }

        // ㈠ Sync 继承过来的方法, 方便阅读, 放在此处
        protected final boolean tryRelease(int releases) {
            // state--
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException();
            boolean free = false;
            // 支持锁重入, 只有 state 减为 0, 才释放成功
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        // ㈡ AQS 继承过来的方法, 方便阅读, 放在此处
        private void unparkSuccessor(Node node) {
            // 如果状态为 Node.SIGNAL 尝试重置状态为 0
            // 不成功也可以
            int ws = node.waitStatus;
            if (ws < 0) {
                compareAndSetWaitStatus(node, ws, 0);
            }
            // 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
            Node s = node.next;
            // 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0) s = t;
            }
            if (s != null) LockSupport.unpark(s.thread);
        }
    }

可重入原理

    static final class NonfairSync extends Sync {
        // ...

        // Sync 继承过来的方法, 方便阅读, 放在此处
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
            else if (current == getExclusiveOwnerThread()) {
                // state++
                //如果此时c=1,表示当前线程已经拥有了这把锁,但是此时出现了锁重入的情况,state计数+1=2
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        // Sync 继承过来的方法, 方便阅读, 放在此处
        protected final boolean tryRelease(int releases) {
            // state--
            //当前线程state计数减去一
            int c = getState() - releases;
            //如果当前线程没有用有锁,还调用释放锁的方法会抛出异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
                //free变量标记是否释放了锁
            boolean free = false;
            // 支持锁重入, 只有 state 减为 0, 才释放成功
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    }

可打断原理

不可打断模式

在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了

   // Sync 继承自 AQS
    static final class NonfairSync extends Sync {
        // ...

        private final boolean parkAndCheckInterrupt() {
            // 如果打断标记已经是 true, 则 park 会失效
            LockSupport.park(this);
            // interrupted 会清除打断标记
            return Thread.interrupted();
        }

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (; ; ) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null;
                        failed = false;
                        // 还是需要获得锁后, 才能返回打断状态
                        return interrupted;
                    }
                    if (
                            shouldParkAfterFailedAcquire(p, node) &&
                            //parkAndCheckInterrupt方法在被唤醒或打断后,返回的是当前线程的打断标记,即打断状态
                            //如果被打断了返回true,会进入if语句设置interrupted的值为true,再次尝试去获取锁
                                    parkAndCheckInterrupt()
                    ) {
                        // 如果是因为 interrupt 被唤醒, 返回打断状态为 true
                        interrupted = true;
                    }
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

        public final void acquire(int arg) {
            if (
                    !tryAcquire(arg) &&
                            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
            ) {
                // 如果打断状态为 true
                selfInterrupt();
            }
        }

        static void selfInterrupt() {
            // 重新产生一次中断
            Thread.currentThread().interrupt();
        }
    }

在这里插入图片描述


可打断模式

    static final class NonfairSync extends Sync {
        public final void acquireInterruptibly(int arg) throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 如果没有获得到锁, 进入 ㈠
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }

        // ㈠ 可打断的获取锁流程
        private void doAcquireInterruptibly(int arg) throws InterruptedException {
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (; ; ) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                    // //parkAndCheckInterrupt方法在被唤醒或打断后,返回的是当前线程的打断标记,即打断状态
                            //如果被打断了返回true,会进入if语句,然后抛出被打断的异常
                            parkAndCheckInterrupt()) {
                        // 在 park 过程中如果被 interrupt 会进入此
                        // 这时候抛出异常, 而不会再次进入 for (;;)
                        throw new InterruptedException();
                    }
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    }

公平锁实现原理

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        // AQS 继承过来的方法, 方便阅读, 放在此处
        public final void acquire(int arg) {
            if (
                    !tryAcquire(arg) &&
                            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
            ) {
                selfInterrupt();
            }
        }

        // 与非公平锁主要区别在于 tryAcquire 方法的实现
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
                if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
        public final boolean hasQueuedPredecessors() {
            Node t = tail;
            Node h = head;
            Node s;
            // h != t 时表示队列中有 Node
            return h != t &&
                    (
                            // (s = h.next) == null 表示队列中还有没有老二
                            //头结点是哑元,老二节点优先级最高
                            (s = h.next) == null ||
                                    // 或者队列中老二线程不是此线程
                                    s.thread != Thread.currentThread()
                    );
        }
    }

对比非公平锁的实现

// ㈢ Sync 继承过来的方法, 方便阅读, 放在此处
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // 如果还没有获得锁
            if (c == 0) {
                // 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
            else if (current == getExclusiveOwnerThread()) {
                // state++
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            // 获取失败, 回到调用处
            return false;
        }

条件变量实现原理

每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject


await 流程

开始 Thread-0 持有锁,调用 await,进入 ConditionObjectaddConditionWaiter 流程

创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部

在这里插入图片描述

await()对应源码

     public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //创建新的Node状态为-2(Node.CONDITION),关联 Thread-0,加入等待队列尾部
            Node node = addConditionWaiter();
             .....
        }

addConditionWaiter()对应源码

     //将等待者加入等待队列,返回新创建的Node等待节点
    private Node addConditionWaiter() 
    {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //新创建的节点关联当前线程,值为-2(Node.CONDITION值为-2)
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            //如果等待队列没有节点,那么当前新创建出来的节点作为头部
            //否则作为队列尾部节点
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }


接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁

在这里插入图片描述

unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功

在这里插入图片描述

await()对应源码

     public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //创建新的Node状态为-2(Node.CONDITION),关联 Thread-0,加入等待队列尾部
            Node node = addConditionWaiter();
            //进入fullyRelease(node)释放锁的流程
            int savedState = fullyRelease(node);
            ....
        }

fullyRelease(node)对应源码

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
           //获取当前state的值
            int savedState = getState();
            //释放锁,state值清零,即释放所有锁(可重入锁state值大于一)
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

release(savedState)对应源码

    public final boolean release(int arg) 
    {       
    //释放锁成功
        if (tryRelease(arg)) 
        {
            Node h = head;
              //头结点不为空,并且waitStatus 为-1
            if (h != null && h.waitStatus != 0)
            //唤醒头结点的后继节点--unpark
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease(arg)对应源码

   protected final boolean tryRelease(int releases) 
   {        
            //这里releases的值为int savedState = getState();
            //因此这里c为0,相当于释放了当前线程加的所有锁(可重入锁)
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //锁被释放了
            if (c == 0)
             {
                free = true;
                //设置owner值为null
                setExclusiveOwnerThread(null);
            }
            //解锁
            setState(c);
            return free;
        }

park 阻塞 Thread-0

在这里插入图片描述

await()对应源码

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //创建新的Node状态为-2(Node.CONDITION),关联 Thread-0,加入等待队列尾部
            Node node = addConditionWaiter();
            //进入fullyRelease(node)释放锁的流程
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                //调用park(this)方法,将自己阻塞,进入休息室等待
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
             ...
        }

signal 流程

假设 Thread-1 要来唤醒 Thread-0
在这里插入图片描述

signal()源码

      public final void signal() {
            //当前线程是否是锁的持有者
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //获取等待队列中第一个节点     
            Node first = firstWaiter;
            if (first != null)
            //如果第一个节点不为空,就唤醒第一个节点
            //这里可以看出,底层不是随机唤醒
                doSignal(first);
        }

进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node

在这里插入图片描述
执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的
waitStatus 改为 -1
在这里插入图片描述
Thread-1 释放锁,进入 unlock 流程,略

doSignal()源码

      private void doSignal(Node first) {
            do {
            //将当前等待队列中的第一个节点,从等待队列的双向链表中断开连接
            //如果第一个节点有后继节点,那么让后继节点成为第一个节点
                if ( (firstWaiter = first.nextWaiter) == null)
                //如果没有后继节点,说明当前队列就一个元素,lastWaiter 指针值变为null
                    lastWaiter = null;
                    //从链表中断开连接
                first.nextWaiter = null;
                //transferForSignal尝试将被唤醒的节点,加入AQS阻塞队列尾部
                //这里可能会操作失败,因为在加入队列过程中,存在被打断,取消的可能
            } while (!transferForSignal(first) &&
            //如果加入队列尾部失败,就再尝试去唤醒等待队列中下一个节点
                     (first = firstWaiter) != null);
        }

transferForSignal(first)源码

    /**
     * 将当前节点从等待队列转移到同步队列
     * 转移成功返回true
     * 如果转移过程中当前节点被取消了,那么转移失败 
     */
    final boolean transferForSignal(Node node) {
        /*
         * 将当前节点的状态由-2切换为0,如果切换失败,表示当前节点被取消了
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * 将被唤醒的节点加入同步队列的尾部,如果加入成功,返回的是加入成功后该节点的前驱节点
         */
        Node p = enq(node);
        //获取前驱节点的状态
        int ws = p.waitStatus;
        //设置前驱节点的状态为-1,表示他有职责唤醒他的后继节点
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        //如果转移失败了,会唤醒该节点的线程
            LockSupport.unpark(node.thread);
        return true;
    }

ConditionObject 类源码

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;

    // 第一个等待节点
    private transient Node firstWaiter;

    // 最后一个等待节点
    private transient Node lastWaiter;

    public ConditionObject() {
    }

    // ㈠ 添加一个 Node 至等待队列
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // 所有已取消的 Node 从队列链表删除, 见 ㈡
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        // 创建一个关联当前线程的新 Node, 添加至队列尾部
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }

    // 唤醒 - 将没取消的第一个节点转移至 AQS 队列
    private void doSignal(Node first) {
        do {
            // 已经是尾节点了
            if ((firstWaiter = first.nextWaiter) == null) {
                lastWaiter = null;
            }
            first.nextWaiter = null;
        } while (
            // 将等待队列中的 Node 转移至 AQS 队列, 不成功且还有节点则继续循环 ㈢
                !transferForSignal(first) &&
                        // 队列还有节点
                        (first = firstWaiter) != null
        );
    }

    // 外部类方法, 方便阅读, 放在此处
    // ㈢ 如果节点状态是取消, 返回 false 表示转移失败, 否则转移成功
    final boolean transferForSignal(Node node) {
        // 如果状态已经不是 Node.CONDITION, 说明被取消了
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        // 加入 AQS 队列尾部
        Node p = enq(node);
        int ws = p.waitStatus;
        if (
            // 上一个节点被取消
                ws > 0 ||
                        // 上一个节点不能设置状态为 Node.SIGNAL
                        !compareAndSetWaitStatus(p, ws, Node.SIGNAL)
        ) {
            // unpark 取消阻塞, 让线程重新同步状态
            LockSupport.unpark(node.thread);
        }
        return true;
    }

    // 全部唤醒 - 等待队列的所有节点转移至 AQS 队列
    private void doSignalAll(Node first) {
        lastWaiter = firstWaiter = null;
        do {
            Node next = first.nextWaiter;
            first.nextWaiter = null;
            transferForSignal(first);
            first = next;
        } while (first != null);
    }

    // ㈡
    private void unlinkCancelledWaiters() {
        // ...
    }

    // 唤醒 - 必须持有锁才能唤醒, 因此 doSignal 内无需考虑加锁
    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }

    // 全部唤醒 - 必须持有锁才能唤醒, 因此 doSignalAll 内无需考虑加锁
    public final void signalAll() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignalAll(first);
    }

    // 不可打断等待 - 直到被唤醒
    public final void awaitUninterruptibly() {
        // 添加一个 Node 至等待队列, 见 ㈠
        Node node = addConditionWaiter();
        // 释放节点持有的锁, 见 ㈣
        int savedState = fullyRelease(node);
        boolean interrupted = false;
        // 如果该节点还没有转移至 AQS 队列, 阻塞
        while (!isOnSyncQueue(node)) {
            // park 阻塞
            LockSupport.park(this);
            // 如果被打断, 仅设置打断状态
            if (Thread.interrupted())
                interrupted = true;
        }
        // 唤醒后, 尝试竞争锁, 如果失败进入 AQS 队列
        if (acquireQueued(node, savedState) || interrupted)
            selfInterrupt();
    }

    // 外部类方法, 方便阅读, 放在此处
    // ㈣ 因为某线程可能重入,需要将 state 全部释放
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

    // 打断模式 - 在退出等待时重新设置打断状态
    private static final int REINTERRUPT = 1;
    // 打断模式 - 在退出等待时抛出异常
    private static final int THROW_IE = -1;

    // 判断打断模式
    private int checkInterruptWhileWaiting(Node node) {
        return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
    }

    // ㈤ 应用打断模式
    private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
    }

    // 等待 - 直到被唤醒或打断
    public final void await() throws InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        // 添加一个 Node 至等待队列, 见 ㈠
        Node node = addConditionWaiter();
        // 释放节点持有的锁
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 如果该节点还没有转移至 AQS 队列, 阻塞
        while (!isOnSyncQueue(node)) {
            // park 阻塞
            LockSupport.park(this);
            // 如果被打断, 退出等待队列
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // 退出等待队列后, 还需要获得 AQS 队列的锁
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        // 所有已取消的 Node 从队列链表删除, 见 ㈡
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        // 应用打断模式, 见 ㈤
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

    // 等待 - 直到被唤醒或打断或超时
    public final long awaitNanos(long nanosTimeout) throws InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        // 添加一个 Node 至等待队列, 见 ㈠
        Node node = addConditionWaiter();
        // 释放节点持有的锁
        int savedState = fullyRelease(node);
        // 获得最后期限
        final long deadline = System.nanoTime() + nanosTimeout;
        int interruptMode = 0;
        // 如果该节点还没有转移至 AQS 队列, 阻塞
        while (!isOnSyncQueue(node)) {
            // 已超时, 退出等待队列
            if (nanosTimeout <= 0L) {
                transferAfterCancelledWait(node);
                break;
            }
            // park 阻塞一定时间, spinForTimeoutThreshold 为 1000 ns
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            // 如果被打断, 退出等待队列
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        // 退出等待队列后, 还需要获得 AQS 队列的锁
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        // 所有已取消的 Node 从队列链表删除, 见 ㈡
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        // 应用打断模式, 见 ㈤
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return deadline - System.nanoTime();
    }

    // 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanos
    public final boolean awaitUntil(Date deadline) throws InterruptedException {
        // ...
    }

    // 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanos
    public final boolean await(long time, TimeUnit unit) throws InterruptedException {
        // ...
    }
    // 工具方法 省略 ...
}

读写锁原理

ReentrantReadWriteLock

当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。 类似于数据库中的 select ...from ... lock in share mode

提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法写锁保护数据的 write() 方法

package reentrantLock;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.ReentrantReadWriteLock;

import static java.lang.Thread.sleep;

@Slf4j
class DataContainer {
    private Object data;
    private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
    private ReentrantReadWriteLock.ReadLock r = rw.readLock();
    private ReentrantReadWriteLock.WriteLock w = rw.writeLock();

    @SneakyThrows
    public Object read() {
        log.debug("获取读锁...");
        r.lock();
        try {
            log.debug("读取");
            sleep(1);
            return data;
        } finally {
            log.debug("释放读锁...");
            r.unlock();
        }
    }

    @SneakyThrows
    public void write() {
        log.debug("获取写锁...");
        w.lock();
        try {
            log.debug("写入");
            sleep(1);
        } finally {
            log.debug("释放写锁...");
            w.unlock();
        }
    }
}

测试 读锁-读锁 可以并发

package reentrantLock;

/**
 * @author 大忽悠
 * @create 2022/1/7 13:26
 */
public class Main
{
    public static void main(String[] args)
    {
        DataContainer dataContainer = new DataContainer();

        new Thread(() -> {
            dataContainer.read();
        }, "t1").start();

        new Thread(() -> {
            dataContainer.read();
        }, "t2").start();

    }
}

输出结果,从这里可以看到 Thread-0 锁定期间,Thread-1 的读操作不受影响

14:05:14.341 c.DataContainer [t2] - 获取读锁... 
14:05:14.341 c.DataContainer [t1] - 获取读锁... 
14:05:14.345 c.DataContainer [t1] - 读取
14:05:14.345 c.DataContainer [t2] - 读取
14:05:15.365 c.DataContainer [t2] - 释放读锁... 
14:05:15.386 c.DataContainer [t1] - 释放读锁...

测试 读锁-写锁 相互阻塞

在这里插入代码片

输出结果

14:04:21.838 c.DataContainer [t1] - 获取读锁... 
14:04:21.838 c.DataContainer [t2] - 获取写锁... 
14:04:21.841 c.DataContainer [t2] - 写入
14:04:22.843 c.DataContainer [t2] - 释放写锁... 
14:04:22.843 c.DataContainer [t1] - 读取
14:04:23.843 c.DataContainer [t1] - 释放读锁...

写锁-写锁 也是相互阻塞的,这里就不测试了


注意事项

        r.lock();
        try {
            // ...
            w.lock();
            try {
                // ...
            } finally {
                w.unlock();
            }
        } finally {
            r.unlock();
        }

package reentrantLock;

import java.util.concurrent.locks.ReentrantReadWriteLock;

class CachedData<ReentrantReadWriteLock> {
    Object data;
    // 是否有效,如果失效,需要重新计算 data
    volatile boolean cacheValid;
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    void processCachedData() {
        rwl.readLock().lock();
        if (!cacheValid) {
            // 获取写锁前必须释放读锁
            rwl.readLock().unlock();
            rwl.writeLock().lock();
            try {
                // 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
                if (!cacheValid) {
                    data = ...
                    cacheValid = true;
                }
                // 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
                rwl.readLock().lock();
            } finally {
                rwl.writeLock().unlock();
            }
        }
        // 自己用完数据, 释放读锁 
        try {
            use(data);
        } finally {
            rwl.readLock().unlock();
        }
    }
}

缓存

1. 缓存更新策略

更新时,是先清缓存还是先更新数据库

先清缓存

在这里插入图片描述
先更新数据库

在这里插入图片描述
补充一种情况,假设查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询

在这里插入图片描述
这种情况的出现几率非常小,见 facebook 论文


2. 读写锁实现一致性缓存

使用读写锁实现一个简单的按需加载缓存

package com;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class GenericCachedDao<T> {

    /**
     *  HashMap 作为缓存非线程安全, 需要保护
     */
    HashMap<SqlPair, T> map = new HashMap<>();

    /**
     * 读写锁
     */
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    /**
     * 数据库查询类
     */
    GenericDao genericDao = new GenericDao();

    /**
     * <p>
     *     更新方法
     * </p>
     * @param sql
     * @param params
     * @return
     */
    public int update(String sql, Object... params) {
        SqlPair key = new SqlPair(sql, params);
        // 加写锁, 防止其它线程对缓存读取和更改
        lock.writeLock().lock();
        try {
            //先更新数据库
            int rows = genericDao.update(sql, params);
            //再清空缓存
            map.clear();
            return rows;
        } finally {
            //释放写锁
            lock.writeLock().unlock();
        }
    }

    /**
     * @param beanClass
     * @param sql
     * @param params
     * @return
     */
    public T queryOne(Class<T> beanClass, String sql, Object... params) {
        SqlPair key = new SqlPair(sql, params);
        // 加读锁, 防止其它线程对缓存更改
        lock.readLock().lock();
        try {
            T value = map.get(key);
            if (value != null) {
                return value;
            }
        } finally {
            lock.readLock().unlock();
        }
        // 加写锁, 防止其它线程对缓存读取和更改
        lock.writeLock().lock();
        try {
            // get 方法上面部分是可能多个线程进来的, 可能已经向缓存填充了数据
            // 为防止重复查询数据库, 再次验证
            T value = map.get(key);
            if (value == null) {
                // 如果没有, 查询数据库
                value = genericDao.queryOne(beanClass, sql, params);
                map.put(key, value);
            }
            return value;
        } finally {
            lock.writeLock().unlock();
        }
    }

    // 作为 key 保证其是不可变的
    class SqlPair
    {
        /**
         * sql
         */
        private String sql;
        /**
         * 参数
         */
        private Object[] params;

        public SqlPair(String sql, Object[] params) {
            this.sql = sql;
            this.params = params;
        }

        /**
         * @param o 
         * @return 这里需要重写equals和hashcode方法,因为默认是根据对象地址进行判断的
         */
        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            SqlPair sqlPair = (SqlPair) o;
            return sql.equals(sqlPair.sql) && Arrays.equals(params, sqlPair.params);
        }


        /**
         * @return 这里的hashcode计算值是以内部属性计算得出的
         */
        @Override
        public int hashCode() {
            int result = Objects.hash(sql);
            result = 31 * result + Arrays.hashCode(params);
            return result;
        }
    }
}

注意

以上实现体现的是读写锁的应用,保证缓存和数据库的一致性,但有下面的问题没有考虑

乐观锁实现:用 CAS 去更新


读写锁原理

1. 图解流程

加锁

读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个

在这里插入图片描述
t1 w.lock,t2 r.lock

1) t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

tryAcquire(int acquires)源码

 protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            //获取state的值
            int c = getState();
            //获取写锁部分
            int w = exclusiveCount(c);
            //c!=0表示有人加锁了,但是这里高十六位或者第十六位不等于0都会导致c不等于0,无法判断是加了读锁还是写锁 
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                //如果写锁部分等于0,表示加的是读锁,读锁和写锁是互斥的,因此这里返回false
                //判断写锁是不是自己加的,如果是被人加的,返回false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                    //写锁重入次数超过最大值
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                //写锁可重入一次
                setState(c + acquires);
                return true;
            }
            //c==0还没有人加锁
            //writerShouldBlock--->写锁是否应该阻塞: 这里需要区分公平锁和非公平锁
            //公平锁:如果当前同步队列有老二,那么返回true,否则如果同步队列为空或者当前线程就是老二,返回false
            //非公平锁:总是返回false
            if (writerShouldBlock() ||
            //compareAndSetState--->设置c=1,设置写锁部分为1
                !compareAndSetState(c, c + acquires))
                return false;
                //设置当前线程为锁的拥有者
            setExclusiveOwnerThread(current);
            return true;
        }

在这里插入图片描述
在这里插入图片描述


2)t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写锁占据,那么 tryAcquireShared 返回 -1 表示失败

tryAcquireShared 返回值表示

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

tryAcquireShared(int unused)源码

      protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            //查看是否加了写锁,如果!=0表示加了写锁,加了写锁返回true
            if (exclusiveCount(c) != 0 &&
            //如果加了写锁,并且是当前线程自己加的,那么返回false---这里是因为同一个线程可以先加写锁后加读锁
            //完成锁的降级操作
            //如果写锁是另一个线程加的,那么返回false
                getExclusiveOwnerThread() != current)
                //因为这里写锁是t1加的,而t2想要加读锁,因此这里返回-1
                return -1;
            .....          
                return 1;
            }
            return fullTryAcquireShared(current);
        }

在这里插入图片描述


3)这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态
在这里插入图片描述
4)t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁

5)如果没有成功,在 doAcquireSharedfor (;;) 循环一次,把前驱节点的 waitStatus 改为 -1,再 for (;

标签:Node,JUC,node,int,工具包,节点,并发,return,final
来源: https://blog.csdn.net/m0_53157173/article/details/122327435