java中AQS究竟是做什么的?
作者:互联网
当你使用java实现一个线程同步的对象时,一定会包含一个问题:
你该如何保证多个线程访问该对象时,正确地进行阻塞等待,正确地被唤醒?
关于这个问题,java的设计者认为应该是一套通用的机制
因此将一套线程阻塞等待以及被唤醒时锁分配的机制称之为AQS
全称 AbstractQuenedSynchronizer
中文名即抽象的队列式同步器 。
基于AQS,实现了例如ReentenLock之类的经典JUC类。
AQS简要步骤
- 线程访问资源,如果资源足够,则把线程封装成一个Node,设置为活跃线程进入CLH队列,并扣去资源
- 资源不足,则变成等待线程Node,也进入CLH队列
- CLH是一个双向链式队列, head节点是实际占用锁的线程,后面的节点则都是等待线程所对应对应的节点
AQS的资源state
state定义
AQS中的资源是一个int值,而且是volatile的,并提供了3个方法给子类使用:
private volatile int state; protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } // cas方法 compareAndSetState(int oldState, int newState);
如果state上限只有1,那么就是独占模式Exclusive,例如 ReentrantLock
如果state上限大于1,那就是共享模式Share,例如 Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier
已经有CAS方法了,为什么资源state还要定义成volatile的?
对外暴露的getter/setter方法,是走不了CAS的。而且setter/getter没有被synchronized修饰。所以必须要volatile,保证可见性
这样基于AQS的实现可以直接通过getter/setter操作state变量,并且保证可见性,也避免重排序带来的影响。比如CountDownLatch,ReentrantReadWriteLock,Semaphore都有体现(各种getState、setState)
对资源的操作什么时候用CAS,什么使用setState?
volatile的state成员有一个问题,就是如果是复合操作的话不能保证复合操作的原子性
因此涉及 state增减的情况,采用CAS
如果是state设置成某个固定值,则使用setState
AQS的CLH队列
为什么需要一个CLH队列
这个队列的目的是为了公平锁的实现
即为了保证先到先得,要求每个线程封装后的Node按顺序拼接起来。
CLH本质?是一个Queue容器吗
不是的,本质上是一个链表式的队列
因此核心在于链表节点Node的定义
除了比较容易想到的prev和next指针外
还包含了该节点内的线程
以及 waitStatus 等待状态
4种等待状态如下:
- CANCELLED(1): 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态;
- SIGNAL(-1):后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
- CONDITION(-2) : 点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
- PROPAGATE(-3) : 表示下一次共享式同步状态获取将会无条件地传播下去
- INIT( 0):
入队是怎么保证安全的?
入队过程可能引发冲突
因此会用CAS保障入队安全。
private Node enq(final Node node) { //多次尝试,直到成功为止 for (;;) { Node t = tail; //tail不存在,设置为首节点 if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { //设置为尾节点 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
出队过程会发生什么?
一旦有节点出队,说明有线程释放资源了,队头的等待线程可以开始尝试获取了。
于是首节点的线程释放同步状态后,将会唤醒它的后继节点(next)
而后继节点将会在获取同步状态成功时将自己设置为首节点
注意在这个过程是不需要使用CAS来保证的,因为只有一个线程能够成功获取到同步状态
AQS详细资源获取流程
1. tryAcquire尝试获取资源
AQS使用的设计模式是模板方法模式。
具体代码如下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 发现中断过,则触发中断异常 selfInterrupt(); }
即AQS抽象基类AbstractQueuedSynchronizer给外部调用时,都是调的acquire(int arg)方法。这个方法的内容是写死的。
而acquire中,需要调用tryAcquire(arg), 这个方法是需要子类实现的,作用是判断资源是否足够获取arg个
(下面部分代码注释选自: (2条消息) AQS子类的tryAcquire和tryRelease的实现_Mutou_ren的博客-CSDN博客_aqs tryacquire )
ReentrantLock中的tryAcquire实现
这里暂时只谈论一种容易理解的tryAcuire实现,其他附加特性的tryAcquire先不提。
里面主要就做这几件事:
- 获取当前锁的资源数
- 资源数为0,说明可以抢, 确认是前置节点是头节点,进行CAS试图争抢,抢成功就返回true,并设置当前线程
- 没抢成功,返回false
- 如果是重入的,则直接set设置增加后的状态值,状态值此时不一定为0和1了
protected final boolean tryAcquire(int acquires){ final Thread current = Thread.currentThread(); int c = getState(); // state==0代表当前没有锁,可以进行获取 if (c == 0) { // 非公平才有的判断,会判断是否还有前驱节点,直接自己为头节点了或者同步队列空了才会继续后面的锁的获取操作 if (!hasQueuedPredecessors() //CAS设置state为acquires,成功后标记exclusiveOwnerThread为当前线程 && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 当前占有线程等于自己,代表重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; // 出现负数,说明溢出了 if (nextc < 0) // throw new Error("Maximum lock count exceeded"); // 因为是重入操作,可以直接进行state的增加,所以不需要CAS setState(nextc); return true; } return false; }
2.addWaiter 添加到等待队列
当获取资源失败,会进行addWaiter(Node.EXCLUSIVE), arg)。
目的是创建一个等待节点Node,并添加到等待队列
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; // 通过CAS竞争队尾 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 竞争队尾失败,于是进行CAS频繁循环竞争队尾 enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
3. acquireQueued循环阻塞-竞争
并在 "处于头节点时尝试获取资源->睡眠->唤醒“中循环。
当已经跑完任务的线程释放资源时,会唤醒之前阻塞的线程。
当被唤醒后,就会检查自己是不是头节点,如果不是,且认为可以阻塞,那就继续睡觉去了
(下面代码注释部分选自AQS(acquireQueued(Node, int) 3)–队列同步器 - 小窝蜗 - 博客园 (http://cnblogs.com) )
final boolean acquireQueued(final Node node, int arg) { // 标识是否获取资源失败 boolean failed = true; try { // 标识当前线程是否被中断过 boolean interrupted = false; // 自旋操作 for (;;) { // 获取当前节点的前继节点 final Node p = node.predecessor(); // 如果前继节点为头结点,说明排队马上排到自己了,可以尝试获取资源,若获取资源成功,则执行下述操作 if (p == head && tryAcquire(arg)) { // 将当前节点设置为头结点 setHead(node); // 说明前继节点已经释放掉资源了,将其next置空,好让虚拟机提前回收掉前继节点 p.next = null; // help GC // 获取资源成功,修改标记位 failed = false; // 返回中断标记 return interrupted; } // 若前继节点不是头结点,或者获取资源失败, // 则需要判断是否需要阻塞该节点持有的线程 // 若可以阻塞,则继续执行parkAndCheckInterrupt()函数, // 将该线程阻塞直至被唤醒 // 唤醒后会检查是否已经被中断,若返回true,则将interrupted标志置于true if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 最终获取资源失败,则当前节点放弃获取资源 if (failed) cancelAcquire(node); } }
4.shouldParkAfterFailedAcquire 检查是否可以阻塞
该方法不会直接阻塞线程,因为一旦线程挂起,后续就只能通过唤醒机制,中间还发生了内核态用户态切换,消耗很大。
因此会先不断确认前继节点的实际状态,在只能阻塞的情况下才会去阻塞。
并且会过滤掉cancel的线程节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取前继节点的等待状态 int ws = pred.waitStatus; // 如果等待状态为Node.SIGNAL(-1),则直接返回true即可以阻塞 // 因为这说明前继节点完成资源的释放或者中断后,会主动唤醒后继节点的(这也即是signal信号的含义),因此方法外面不用再反复CAS了,直接阻塞吧 if (ws == Node.SIGNAL) return true; // 如果前继节点的等待值大于0即CANCELLED(1),说明前继节点的线程发生过cancel动作 // 那就继续往前遍历,直到当前节点的前继节点的状态不为cancel if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 前继节点的等待状态不为SIGNAL(-1),也不为Cancel(1) // 那么只能是PROPAGATE(-3)或者CONDITION(-2)或者INITIAL(0) // 直接设置成SIGNAL,下一次还没CAS成功,就直接睡觉了 // 因此在前面所有节点没辩护的情况下, 最多一次之后就会返回true让外面阻塞 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
5.parkAndCheckInterrupt() 阻塞线程
使用LockSupport.park来阻塞当前这个对象所在的线程
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 确认是否是中断导致的park结束,并清除中断标记 return Thread.interrupted(); } public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); setBlocker(t, null); }
lockSupport.park()和普通的wait|notify都有啥区别?
- 面向的主体不一样。LockSuport主要是针对Thread进进行阻塞处理,可以指定阻塞队列的目标对象,每次可以指定具体的线程唤醒。Object.wait()是以对象为纬度,阻塞当前的线程和唤醒单个(随机)或者所有线程。
- 实现机制不同。虽然LockSuport可以指定monitor的object对象,但和object.wait(),两者的阻塞队列并不交叉。可以看下测试例子。object.notifyAll()不能唤醒LockSupport的阻塞Thread.
如果还要深挖底层实现原理,可以详细见该链接
简而言之,是用mutex和condition保护了一个_counter的变量,当park时,这个变量置为了0,当unpark时,这个变量置为1。
底层用的C语言的pthread_mutex_unlock、pthread_cond_wait 、pthread_cond_signal ,但是针对了mutex和_cond两个变量进行加锁。