从零写一个基于 AQS 实现的 Lock(一)
作者:互联网
AQS
gitee 地址
https://gitee.com/haohaos/aqs
首先考虑 AQS 解决了什么?也就是它能用来做什么?
java.util.concurrent
包下存在很多的并发工具类都是基于 AQS 实现的,例如 ReentrantLock,CountDownLatch。
在并发场景下锁的应用是最多的。但是 synchronized
在一些场景下显的太重了。所以可以考虑使用 Lock 的实现类,例如 ReentrantLock 来实现锁机制。
下面以一个 Lock 的视角来学习 AQS 是怎么实现的。解决了什么问题。
1.0
传统 Java 锁 synchronized
通过对对象的抢占来实现锁。
Object o = new Object();
synchronized(o){
// doSomething
}
在 AQS 中通过对一个 int 类型的变量 state 通过抢占(加,减)来实现锁。
-
思考:如何实现线程安全的对 state 进行抢占?
可以假定,state 初始值为 0 ,一个线程进行
state++
将 state 赋值 1,实现抢占。可是众所周知
state++
不是一个原子操作,无法保证线程安全。所以可以采用 JDK 的 Unsafe 来实现 CAS 操作,从而进行线程安全的递增或递减。为什么共享变量 state 为 int 类型,用 boolean 值不是也可以吗?两者的操作有什么不同?
-
第一版 AQS
public class AQS { private static final long stateOffset; private static final Unsafe UNSAFE; /** * Unsafe 类无法直接被开发人员使用,可以通过反射获取 */ static { try { Field field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); UNSAFE = (Unsafe) field.get(null); stateOffset = UNSAFE.objectFieldOffset(AQS.class.getDeclaredField("state")); } catch (Exception e) { throw new Error(e); } } // 设置 volatile 保证内存可见性,防止指令重排序 private volatile int state = 0; public int getState() { return state; } public void setState(int state) { this.state = state; } public boolean compareAndSetState(int expect, int update){ return UNSAFE.compareAndSwapInt(this,stateOffset,expect,update); }
-
使用 AQS 模拟自旋锁来保证线程安全
public class Test { static int num = 0; static void manyThreadAddNum(int threadNum,int addNum) throws InterruptedException { Thread[] threads = new Thread[threadNum]; for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(()->{ for (int j = 0; j < addNum; j++) { num++; } }); threads[i].start(); } for (Thread thread : threads) { thread.join(); } System.out.println("num:" + num + "," + "target:" + (threadNum * addNum)); } static void manyThreadAddNumByAQS(int threadNum,int addNum) throws InterruptedException { Thread[] threads = new Thread[threadNum]; AQS aqs = new AQS(); for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(()->{ for (int j = 0; j < addNum; j++) { while (!aqs.compareAndSetState(0,1)){}; // 获取不到锁一直循环,模拟自旋锁 num++; aqs.setState(0); // 释放锁 } }); threads[i].start(); } for (Thread thread : threads) { thread.join(); } System.out.println("num:" + num + "," + "target:" + (threadNum * addNum)); } public static void main(String[] args) throws InterruptedException { // manyThreadAddNum(1000,1000); // num:998442,target:1000000 不使用 AQS manyThreadAddNumByAQS(1000,1000); // 使用 AQS } }
2.0
-
思考 1.0 版本有什么不足?
while (!aqs.compareAndSetState(0,1)){}; // 获取不到锁一直循环,模拟自旋锁
如果获取不到锁,便一直自旋,十分浪费 cpu 资源。
此时思考一下
synchronized
是怎么做的,如果自旋锁连续十次获取不到锁,升级为重量级锁。线程阻塞。那么如果自己实现该怎么做?
-
如何阻塞未竞争到锁的线程?
可以采用两种措施 Object.wait()+Object.notify() 或 LockSupport.park()+LockSupport.unpark();
wait() park() 必须在被锁对象的 synchronized 代码块内执行 可以任意处执行 方法不带中断异常 方法带中断异常 唤醒后一定执行后续代码 唤醒后不一定执行后续代码 会释放占有锁资源 不会释放占有锁资源 https://blog.csdn.net/asdasdasd123123123/article/details/107814280
可见无论从实用性还是灵活性上
LockSupport.park()
都更胜一筹。 -
如何管理未竞争到锁的线程?
当前占有锁的线程释放锁后,肯定要通知等待线程。
为此需要一个管理数据结构来保存这些线程。
按照相对公平来说,采用队列(FIFO)先进先出来管理比较合适。
一个线程尝试获取锁如果获取的到就执行,获取不到就阻塞并放入队列等待调度。
-
为了保证代码的封装性,将队列封装到 AQS 内部,为静态内部类。
public class AQS { // 当前占有线程 private volatile Thread curcentThread; // 设置头尾节点方便对入队出队操作 private volatile Node head; private volatile Node tail; static class Node{ volatile Node prev; volatile Node next; volatile Thread thread; public Node() { } public Node(Thread thread) { this.thread = thread; } } }
中间全为代码片段,完整代码放到文章 2.0 最后
-
因为对队列的入队出队操作需要操作头尾节点,考虑并发安全,为头尾节点设置 CAS 安全机制。
public class AQS { private volatile Node head; private volatile Node tail; private static final long headOffset; private static final long tailOffset; private static final Unsafe UNSAFE; /** * Unsafe 类无法直接被开发人员使用,可以通过反射获取 */ static { try { Field field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); UNSAFE = (Unsafe) field.get(null); headOffset = UNSAFE.objectFieldOffset(AQS.class.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset(AQS.class.getDeclaredField("tail")); } catch (Exception e) { throw new Error(e); } } /** * 对链表的操作头部为傀儡节点,方便后续节点操作 */ public final boolean compareAndSetHead(Node update) { return UNSAFE.compareAndSwapObject(this, headOffset, null, update); } public final boolean compareAndSetTail(Node expect, Node update) { return UNSAFE.compareAndSwapObject(this, tailOffset, expect, update); } }
-
添加对于线程执行任务的操作,
/** * 获得 * 先尝试获取,如果此时没有任何线程占有和执行,获取成功,直接执行 * 否则添加到队列等待 */ public void accquire(int arg){ if (!tryAccquire(arg)){ addQueue(); } } /** * 尝试获得 */ private boolean tryAccquire(int arg){ return false; } /** * 添加到等待队列 */ private void addQueue(){ } /** * 释放 * 如果释放成功,唤醒另一个线程 */ public boolean release(int arg){ return tryRelease(arg); } /** * 尝试释放 * @return */ private boolean tryRelease(int arg){ return false; }
-
锁 Lock
public class Lock { AQS aqs = new AQS(); public void lock(){ } public void unLock(){ } }
下面思考几个场景:
- 当前线程获取锁,无等待线程和执行线程
- 比较并设定 state
- 设置独占线程
- 当前线程获取锁,有执行线程无等待线程
- 新增虚拟头节点,将当前线程节点插入到队列尾部
- 在插入过程中(还没有插入),执行线程释放,但是此时队列中又没有数据
- 插入后重试是否可以执行
- 在插入过程中(还没有插入),执行线程释放,但是此时队列中又没有数据
- 新增虚拟头节点,将当前线程节点插入到队列尾部
- 当前线程获取锁,有执行线程有等待线程
- 插入到队列后面,并阻塞线程
- 当前线程释放锁,无等待线程
- 直接释放
- 当前线程释放锁,有等待线程
- 释放通知后续线程节点
- 如果通知后续节点时有新线程进来怎么办
思考问题,解决问题:
-
如何保证只有一个线程可以抢占成功?考虑锁的可重入问题?
线程尝试判断 state 是否为 0,如果为 0,尝试使用 cas 加锁。加上锁设置独占线程为自己。
如果 state 不为 0 ,查看当前独占线程是否为自己,如果是保证锁的可重入性,state + 1。
private boolean tryAccquire(int arg){ int state = getState(); if (state == 0){ int newState = arg; if (compareAndSetState(0,newState)){ setCurcentThread(Thread.currentThread()); // 设置独占线程 return true; } } // 如果此时独占线程是当前线程,可重入 else if (getCurcentThread() == Thread.currentThread()){ int newState = arg + getState(); setState(newState); return true; } return false; }
-
如果线程获取不到锁,如何使线程阻塞?如何使线程解除阻塞后正常执行逻辑?
先调用
addQueue()
插入到等待队列。随后再一次请求占用 state 。==注意:==为什么要再一次请求占用?
试想一种情况,
线程 1 占用,线程 2 获取占用失败
线程 2 准备插入到队列(此时还没有插入),线程 1 释放占用同时查看队列有无等待节点(此时 2 还有没插入,故等待队列无等待节点)
线程 2 永久阻塞。
所以需要在插入队列后再一次请求看能否获取到占用
-
如何使线程解除阻塞后正常执行逻辑?
注意下面
doAccquire()
方法中的for(;;)
死循环。插入到队列后尝试占用,占用到就返回,不阻塞线程,占用不到调用LockSupport.park(node.thread);
等待占用线程执行完毕后调用LockSupport.unpark
解除阻塞线程,直到占用到 state 为止。占用成功后将当前节点 Thread 设置为 null,同时将当前节点置换为头节点,即傀儡节点。
private void doAccquire() { // 插入到尾节点 Node node = addQueue(); // 再次尝试是否可以执行 /** * 考虑一种情况: * 工作线程执行完毕,查看队列有无等待线程 * 但此时当前线程 Node 节点刚被创建,还没有插入到队列中, * 工作线程释放占用,但是此时该节点永远不会被调用, * 所以要在插入后尝试当前线程节点的上一个节点是不是 head ,如果是立即被调用 * 同时将当前线程节点线程置空,并将 head 指针指向当前线程 * 当前线程被调度,同时 Node 节点成为新的傀儡节点 head */ for (;;){ final Node prev = node.prev; // 检查是否可以被调度 /** * 此时可能有新的线程过来抢占, * 所以需要 head 节点的下一个节点线程重新执行 tryAccquire() 进行抢占 */ if (prev == head && tryAccquire()){ node.waitStatus = 1; setHead(node); node.prev = null; node.thread = null; prev.next = null; return; } /** * 此时已经被插入到队列中 * 思考一种情况: * 如果独占线程释放锁后调度下一个节点线程 * 而下一个节点线程刚好执行到这一行 * 线程是否会永久阻塞? * 如果 LockSupport.unpark(thread) 在 LockSuppert.park(thread) * 之前执行过那么 LockSuppert.park(thread) 并不会阻塞 */ LockSupport.park(node.thread); } } /** * 添加到等待队列 */ private Node addQueue(){ for (;;){ Node tail = getTail(); /** * 如果尾节点为 null,说明头节点也为 null, * 竞争设置头节点,设置成功的线程将头节点赋值给尾节点 * 此时如果有别的线程竞争,发现尾节点为 null,但是此时头节点不为 null,设置失败,可以保证并发安全 */ if (tail == null){ Node newHead = new Node(); if (compareAndSetHead(newHead)){ // 设置头部虚拟节点 setTail(newHead); } } else { /** * 如果尾节点不为空,尝试设置当前节点为尾节点 * 成功后将原先尾节点的下一个指针指向当前节点 * 设置完毕后当前节点为尾节点,其他线程的节点可以继续往后插,但是上一个节点的 next 指针并未指向当前节点,调度不到,保证并发安全 */ Node cur = new Node(Thread.currentThread(),0); cur.prev = tail; if (compareAndSetTail(tail,cur)){ // 将当前节点设置为尾节点 tail.next = cur; return cur; } } } }
-
-
当前线程如何如何释放占有?
判断占有 state 线程是不是当前线程,是的话才可以释放,并且判断释放后 state 是否等于 0,因为要考虑锁的重入性问题。
setState(newState); 一定要在最后设置,保证所有状态更新完毕后再释放对 state 的占用。
private boolean tryRelease(){ if (getCurcentThread() == Thread.currentThread() && getState() > 0){ int state = getState(); int newState = state - 1; /** * 要保证返回前所有状态修改在修改与记录在 state 之前 */ boolean free = false; if (newState == 0){ setCurcentThread(null); free = true; } setState(newState); return free; } return false; }
-
当前线程释放占用后如何通知后续节点
释放后,找到下一个节点,并且使用
LockSupport.unpark(node.thread);
唤醒下一个线程/** * 释放 * 如果释放成功,唤醒另一个线程 * 设置为 0 之后再去唤醒后面线程 */ public void release(){ if (tryRelease()){ // 唤醒等待线程 unparkSuccessor(); } } /** * 抢占线程已经释放 * head 进行了改变 */ private void unparkSuccessor(){ // head 可能会变,但是 h 指向不变 Node h = head; Node node = null; if (h != null) node = h.next; if (h != null && node != null){ /** * 如何保证重试当前线程没有被调度 * 如果已经被调度, * node 变成头节点 * */ LockSupport.unpark(node.thread); } }
注意
private void unparkSuccessor(){ if (head != null && head.next != null){ Node node = head.next; /** * 如何保证重试当前线程没有被调度 * 如果已经被调度 */ LockSupport.unpark(node.thread); }
这是笔者最初写的唤醒下一个线程的方法,但是在第 8 行会报空指针异常。
思考:为什么会报空指针异常?
假设 t1 占用,t2 插入队列,但是还没有重试
t1 释放占用,判断此时 (head != null && head.next != null) head 和 head 的下一个节点都不为空,所以需要唤醒下一个节点。此时 t2 节点重试,发现占用成功,将 t2 对应节点设置为傀儡节点 head,此时 t1 线程重新获取 head,next 。获取的节点是 t2 节点对应的下一个节点自然为空。
更换新的代码逻辑
private void unparkSuccessor(){ // head 可能会变,但是 h 指向不变 Node h = head; Node node = null; if (h != null) node = h.next; if (h != null && node != null){ /** * 如何保证重试当前线程没有被调度 * 如果已经被调度, * node 变成头节点 * */ LockSupport.unpark(node.thread); } }
此时保存内存快照 head 。
无论 head 如何变化,我保存的内存快照 h 还是指向之前 head 的内存地址。
但是我调用已经过期的 head 不会产生问题吗?
此时 T2 已经过期,那我还调用 LockSupport.unpark(T2); 会不会产生问题,甚至可能是 T2 之后的 T3 T4 T5 … 等等依次已经占用过 state,而使用过期那么久的线程 LockSupport.unpark(T2); 是否会产生问题?如果 T2 执行完成之后,再次获取锁,同时阻塞了,此时调用先前的 LockSupport.unpark(T2); 会不会产生影响?
观察以下逻辑
private void doAccquire() { // 插入到尾节点 Node node = addQueue(); for (;;){ final Node prev = node.prev; if (prev == head && tryAccquire()){ node.waitStatus = 1; setHead(node); node.prev = null; node.thread = null; prev.next = null; return; } LockSupport.park(node.thread); } }
我解除阻塞后,如果是原先不阻塞的线程不会有任何影响。如果是原先阻塞的话,无非是 for(;
标签:Node,node,AQS,Lock,state,线程,零写,null,节点 来源: https://blog.csdn.net/weixin_45088998/article/details/120289212