011Java并发包012AQS
作者:互联网
注意:本文基于JDK1.8进行记录。
1 简介
1.1 是什么
AQS是英文单词AbstractQueuedSynchronizer的缩写,翻译过来就是抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock、Semaphore、CountDownLatch等等。
AQS是用来构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个state整型变量表示持有锁的状态。
1.2 抽象
AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。
1.3 原理
抢到资源的线程直接使用处理业务逻辑,抢不到资源的必然涉及一种排队等候机制。既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢。
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的Node结点,通过CAS、自旋以及LockSupport的凭证机制,维护state变量的状态,使并发达到同步的控制效果。
CLH:Craig、Landin、Hagersten(三个科学家名字)队列,原版是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO,其头节点在初始化后变为空节点。
1.4 资源使用方式
AQS定义两种资源使用方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
2 体系架构
1 public abstract class AbstractQueuedSynchronizer 2 extends AbstractOwnableSynchronizer 3 implements java.io.Serializable { 4 ... 5 // 内部封装Node节点 6 static final class Node { 7 // 标记线程以共享的模式等待锁 8 static final Node SHARED = new Node(); 9 // 标记线程以独占的模式等待锁 10 static final Node EXCLUSIVE = null; 11 // waitStatus取值为1表示线程取消(超时、中断),被取消的节点不会阻塞 12 static final int CANCELLED = 1; 13 // waitStatus取值为-1表示后继节点已经准备完成,等待线程释放资源 14 static final int SIGNAL = -1; 15 // waitStatus取值为-2表示线程在Condition队列中阻塞,当其他线程调用了Condition中的唤醒方法后,将节点从Condition队列转移到CLH等待队列(Condition中有使用) 16 static final int CONDITION = -2; 17 // waitStatus取值为-3表示线程及后续线程无条件传播(共享模式可用,CountDownLatch中有使用) 18 static final int PROPAGATE = -3; 19 // 线程的等待状态,初始值为0 20 volatile int waitStatus; 21 // 前驱节点 22 volatile Node prev 23 // 后继节点 24 volatile Node next; 25 // 线程对象 26 volatile Thread thread; 27 ... 28 } 29 // 头节点 30 private transient volatile Node head 31 // 尾节点 32 private transient volatile Node tail; 33 // 资源状态,0表示可获取,大于等于1表示已占用 34 private volatile int state; 35 // 获取资源状态 36 protected final int getState() { 37 return state; 38 } 39 // 设置资源状态 40 protected final void setState(int newState) { 41 state = newState; 42 } 43 // CAS设置资源状态 44 protected final boolean compareAndSetState(int expect, int update) { 45 // See below for intrinsics setup to support this 46 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); 47 } 48 ... 49 }
AQS使用了一个volatile修饰的整型变量state用来表示同步状态,通过内置的CLH同步队列来完成线程的排队工作。
其中,对state值的修改是通过CAS完成的,0表示资源可用,大于等于1表示资源不可用。AQS提供了三种操作state的方法:getState()、setState()、compareAndSetState()。
当前线程根据state的值判断能否获取资源,如果获取失败,AQS会将当前线程thread以及等待状态waitStatus等信息封装成Node节点,并将其加CLH入同步队列,同时阻塞当前线程。当state的值变为可获取资源后,会把Node节点中的线程唤醒,再次尝试获取资源。
3 Lock与AQS
Lock接口的实现类,基本都是通过聚合了一个队列同步器的子类完成线程访问控制的。
1 public class ReentrantLock implements Lock, java.io.Serializable { 2 ... 3 abstract static class Sync extends AbstractQueuedSynchronizer { 4 ... 5 final boolean nonfairTryAcquire(int acquires) { 6 final Thread current = Thread.currentThread(); 7 int c = getState(); 8 if (c == 0) { 9 if (compareAndSetState(0, acquires)) { 10 setExclusiveOwnerThread(current); 11 return true; 12 } 13 } 14 else if (current == getExclusiveOwnerThread()) { 15 int nextc = c + acquires; 16 if (nextc < 0) // overflow 17 throw new Error("Maximum lock count exceeded"); 18 setState(nextc); 19 return true; 20 } 21 return false; 22 } 23 ... 24 } 25 static final class NonfairSync extends Sync { 26 ... 27 final void lock() { 28 if (compareAndSetState(0, 1)) 29 setExclusiveOwnerThread(Thread.currentThread()); 30 else 31 acquire(1); 32 } 33 protected final boolean tryAcquire(int acquires) { 34 return nonfairTryAcquire(acquires); 35 } 36 } 37 static final class FairSync extends Sync { 38 ... 39 final void lock() { 40 acquire(1); 41 } 42 protected final boolean tryAcquire(int acquires) { 43 final Thread current = Thread.currentThread(); 44 int c = getState(); 45 if (c == 0) { 46 if (!hasQueuedPredecessors() && 47 compareAndSetState(0, acquires)) { 48 setExclusiveOwnerThread(current); 49 return true; 50 } 51 } 52 else if (current == getExclusiveOwnerThread()) { 53 int nextc = c + acquires; 54 if (nextc < 0) 55 throw new Error("Maximum lock count exceeded"); 56 setState(nextc); 57 return true; 58 } 59 return false; 60 } 61 } 62 public ReentrantLock() { 63 sync = new NonfairSync(); 64 } 65 public ReentrantLock(boolean fair) { 66 sync = fair ? new FairSync() : new NonfairSync(); 67 } 68 ... 69 }
ReentrantLock类的内部聚合了一个Sync类,Sync类继承了AQS类,并且非公平锁NonfairSync和公平锁FairSync都继承自Sync,默认创建的是非公平锁NonfairSync。
4 分析ReentrantLock
4.1 概述
整个ReentrantLock的加锁过程,可以分为三个阶段:
1)尝试加锁。
2)加锁失败,线程入队列。
3)线程入队列后,进入阻赛状态。
4.2 场景举例
举例三个客户在银行办理业务,使用默认的非公平锁:
1 public static void main(String[] args) { 2 Lock lock = new ReentrantLock(); 3 new Thread(()->{ 4 lock.lock(); 5 try { 6 System.out.println(Thread.currentThread().getName() + "-----办理业务"); 7 try { 8 TimeUnit.SECONDS.sleep(60); 9 } catch (InterruptedException e) { 10 e.printStackTrace(); 11 } 12 System.out.println(Thread.currentThread().getName() + "-----离开"); 13 } finally { 14 lock.unlock(); 15 } 16 }, "A").start(); 17 new Thread(()->{ 18 lock.lock(); 19 try { 20 System.out.println(Thread.currentThread().getName() + "-----办理业务"); 21 try { 22 TimeUnit.SECONDS.sleep(60); 23 } catch (InterruptedException e) { 24 e.printStackTrace(); 25 } 26 System.out.println(Thread.currentThread().getName() + "-----离开"); 27 } finally { 28 lock.unlock(); 29 } 30 }, "B").start(); 31 new Thread(()->{ 32 lock.lock(); 33 try { 34 System.out.println(Thread.currentThread().getName() + "-----办理业务"); 35 try { 36 TimeUnit.SECONDS.sleep(60); 37 } catch (InterruptedException e) { 38 e.printStackTrace(); 39 } 40 System.out.println(Thread.currentThread().getName() + "-----离开"); 41 } finally { 42 lock.unlock(); 43 } 44 }, "C").start(); 45 }
5 程序分析
5.1 线程A开始并执行
5.1.1 获取资源
线程A进入,调用lock()方法,查看实现:
1 final void lock() { 2 // 使用CAS设置state为1 3 if (compareAndSetState(0, 1)) 4 // 表示获取资源成功,将当前线程设为占用线程 5 setExclusiveOwnerThread(Thread.currentThread()); 6 else 7 // 表示获取资源失败,继续抢占资源 8 acquire(1); 9 }
因为线程A是第一个获取资源的线程,所以使用compareAndSetState()方法设置成功,继续调用setExclusiveOwnerThread()方法将当前线程设为占用线程,然后继续执行业务。
5.2 线程B开始并阻塞
5.2.1 获取资源
线程B进入,调用lock()方法。
因为线程B是第二个获取资源的线程,线程A已经将state从0改为了1,所以使用compareAndSetState()方法设置失败,继续调用acquire()方法获取资源,查看实现:
1 public final void acquire(int arg) { 2 // 抢占资源 3 if (!tryAcquire(arg) && 4 // 加入等待队列 5 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 6 // 线程阻塞 7 selfInterrupt(); 8 }
如果抢占资源成功,调用tryAcquire()方法返回true,判断条件结束,继续执行业务。
如果抢占资源失败,继续判断acquireQueued()方法返回。执行addWaiter()方法并传入参数表示使用独占模式将线程加入到等待队列。
5.2.2 抢占资源
线程B进入,继续调用acquire()方法获取资源,执行tryAcquire()方法,查看实现:
1 protected final boolean tryAcquire(int acquires) { 2 // 继续调用非公平锁的尝试抢占方法 3 return nonfairTryAcquire(acquires); 4 }
继续调用非公平锁的nonfairTryAcquire()方法,返回false表示占用失败:
1 final boolean nonfairTryAcquire(int acquires) { 2 // 记录当前线程 3 final Thread current = Thread.currentThread(); 4 // 记录当前资源状态 5 int c = getState(); 6 // 0表示当前资源可用 7 if (c == 0) { 8 // 使用CAS设置state为请求数 9 if (compareAndSetState(0, acquires)) { 10 // 表示获取资源成功,将当前线程设为占用线程 11 setExclusiveOwnerThread(current); 12 return true; 13 } 14 } 15 // 大于等于1表示当前资源被占用,判断当前线程是否为占用线程(可重入锁的情况) 16 else if (current == getExclusiveOwnerThread()) { 17 // 当前线程为占用线程,记录资源状态 18 int nextc = c + acquires 19 // 判断是否溢出 20 if (nextc < 0) // overflow 21 throw new Error("Maximum lock count exceeded"); 22 // 设置state为新的资源状态 23 setState(nextc); 24 return true; 25 } 26 return false; 27 }
5.2.3 进入等待
线程B进入,继续调用addWaiter()方法将当前线程加入等待队列,查看实现:
1 private Node addWaiter(Node mode) { 2 // 将当前线程和传入的独占模式封装为节点 3 Node node = new Node(Thread.currentThread(), mode); 4 // Try the fast path of enq; backup to full enq on failure 5 Node pred = tail; 6 // 尾节点不为空,表示CLH队列已经初始化,CAS操作将当前节点设为尾节点 7 if (pred != null) { 8 node.prev = pred; 9 if (compareAndSetTail(pred, node)) { 10 pred.next = node; 11 return node; 12 } 13 } 14 // 尾节点为空,表示CLH队列还未初始化,初始化队列 15 enq(node); 16 return node; 17 }
因为线程B是第一个进入等待的线程,尾节点为空,继续查看enq()方法:
1 private Node enq(final Node node) { 2 for (;;) { 3 Node t = tail; 4 // 尾节点为空,通过CAS设置头节点和尾节点为空节点 5 if (t == null) { // Must initialize 6 if (compareAndSetHead(new Node())) 7 tail = head; 8 } else { 9 // 尾节点不为空,通过CAS将当前节点作为新的尾节点 10 node.prev = t; 11 if (compareAndSetTail(t, node)) { 12 t.next = node; 13 return t; 14 } 15 } 16 } 17 }
初始化CLH队列后,头节点为空节点,尾节点为当前节点。
5.2.4 阻塞线程
线程B得到当前节点后,作为参数传入acquireQueued()方法继续执行:
1 final boolean acquireQueued(final Node node, int arg) { 2 // 记录当前节点是否取消,默认为true,表示取消 3 boolean failed = true; 4 try { 5 // 标记当前节点是否中断,默认为false,表示当前节点没有中断 6 boolean interrupted = false 7 // 自旋 8 for (;;) { 9 // 获取当前节点的上一节点 10 final Node p = node.predecessor(); 11 // 如果当前节点是头节点,表示当前节点即将被唤醒,尝试抢占资源 12 if (p == head && tryAcquire(arg)) { 13 // 将当前节点设为头节点,置空当前节点的上一节点,并取消当前节点同当前线程的绑定 14 setHead(node); 15 // 将原头节点的下一节点置空,方便GC回收 16 p.next = null; // help GC 17 // 标记当前节点为false,表示没有取消 18 failed = false; 19 // 返回false,表示当前节点没有中断 20 return interrupted; 21 } 22 // 不管当前节点是不是头节点,执行到这里就表示获取资源失败,处理前置节点并阻塞当前节点 23 if (shouldParkAfterFailedAcquire(p, node) && 24 parkAndCheckInterrupt()) 25 // 标记为true,表示当前节点中断 26 interrupted = true; 27 } 28 } finally { 29 // 当前节点如果被取消,执行取消操作 30 if (failed) 31 cancelAcquire(node); 32 } 33 }
因为线程B是第一个进入等待的线程,上一节点为头节点,尝试获取资源。获取成功则将当前节点作为头节点并移除当前线程,获取失败则进入判断。
在判断条件中调用shouldParkAfterFailedAcquire()方法,处理前置节点:
1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 2 // 记录上一节点的等待状态 3 int ws = pred.waitStatus; 4 if (ws == Node.SIGNAL) 5 // 如果上一节点的等待状态为-1,表示当前线程可以被阻塞,返回true,执行parkAndCheckInterrupt()方法 6 return true; 7 if (ws > 0) { 8 // 如果上一节点的等待状态为1,表示上一节点被取消,循环移除被取消的上一节点 9 do { 10 node.prev = pred = pred.prev; 11 } while (pred.waitStatus > 0); 12 pred.next = node; 13 } else { 14 // 上述条件不满足,表示上一节点的等待状态为0或者-3,通过CAS将等待状态设置为-1 15 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 16 } 17 // 返回false,跳过parkAndCheckInterrupt()方法,重新进入自旋 18 return false; 19 }
因为线程B是第一个进入等待的线程,上一节点为头节点,头节点为空节点,等待状态为0,所以两次进入此方法。
第一次进入shouldParkAfterFailedAcquire()方法将上一节点的等待状态设置为-1后返回false,条件判断为false重新进入自旋。
第二次进入shouldParkAfterFailedAcquire()方法检测到上一节点的等待状态为-1,返回true,继续判断parkAndCheckInterrupt()方法。
在判断条件中调用parkAndCheckInterrupt()方法,阻塞当前节点:
1 private final boolean parkAndCheckInterrupt() { 2 // 使用LockSupport的park()方法阻塞当前节点 3 LockSupport.park(this); 4 // 返回线程的中断状态 5 return Thread.interrupted(); 6 }
线程B在此被阻塞。
5.3 线程C开始并阻塞
5.3.1 获取资源
线程C进入,调用lock()方法。
因为线程C是第三个获取资源的线程,线程A已经将state从0改为了1,所以使用compareAndSetState()方法设置失败,继续调用acquire()方法获取资源。
如果抢占资源成功,调用tryAcquire()方法返回true,判断条件结束,继续执行业务。
如果抢占资源失败,继续判断acquireQueued()方法返回。执行addWaiter()方法并传入参数表示使用独占模式将线程加入到等待队列。
5.3.2 抢占资源
线程C进入,继续调用acquire()方法获取资源,执行tryAcquire()方法。
继续调用非公平锁的nonfairTryAcquire()方法,返回false表示占用失败。
5.3.3 进入等待
线程C进入,继续调用addWaiter()方法将当前线程加入等待队列。
因为线程C是第二个进入等待的线程,线程B已经完成了队列初始化,尾节点不为空,将当前节点作为新的尾节点。
5.3.4 阻塞线程
线程C得到当前节点后,作为参数传入acquireQueued()方法继续执行。
因为线程C是第二个进入等待的线程,上一节点不为头节点,直接进入判断。
在判断条件中调用shouldParkAfterFailedAcquire()方法,处理前置节点,将B节点的等待状态设为-1,返回true,继续判断parkAndCheckInterrupt()方法。
在判断条件中调用parkAndCheckInterrupt()方法,阻塞当前节点。
线程C在此被阻塞。
5.4 线程A结束
5.4.1 解锁资源
线程A执行完毕,调用unlock()方法释放资源并唤醒线程,查看实现:
1 public void unlock() { 2 sync.release(1); 3 }
继续查看release()方法:
1 public final boolean release(int arg) { 2 // 调用tryRelease()方法尝试释放资源 3 if (tryRelease(arg)) { 4 // 获取头节点 5 Node h = head; 6 // 如果头节点不为空,并且等待状态不为0,表示需要唤醒其他线程 7 if (h != null && h.waitStatus != 0) 8 // 调用unparkSuccessor()方法并传入头节点,唤醒线程 9 unparkSuccessor(h); 10 return true; 11 } 12 // 释放失败返回false 13 return false; 14 }
5.4.2 释放资源
继续查看tryRelease()方法:
1 protected final boolean tryRelease(int releases) { 2 // 记录资源状态 3 int c = getState() - releases; 4 // 如果当前线程不为占用线程则抛出异常 5 if (Thread.currentThread() != getExclusiveOwnerThread()) 6 throw new IllegalMonitorStateException(); 7 // 标记资源空闲,默认为false 8 boolean free = false 9 // 资源状态为0则标记资源空闲为true,并将占用线程置空 10 if (c == 0) { 11 free = true; 12 setExclusiveOwnerThread(null); 13 } 14 // 设置资源状态 15 setState(c); 16 // 返回资源空闲 17 return free; 18 }
线程A释放资源并返回true,继续执行。
5.4.3 唤醒线程
因为线程B和线程C已经进入等待队列,所以头节点不为空,继续查看unparkSuccessor()方法:
1 private void unparkSuccessor(Node node) { 2 // 记录头节点的等待状态 3 int ws = node.waitStatus; 4 // 如果头节点的等待状态小于0,则将头节点的等待状态设为0 5 if (ws < 0) 6 compareAndSetWaitStatus(node, ws, 0); 7 // 记录头节点的下一节点 8 Node s = node.next; 9 // 判断下一节点是否为空或者下一节点的等待状态是否大于0 10 if (s == null || s.waitStatus > 0) { 11 s = null; 12 // 遍历下一节点,找到不为空并且等待状态小于等于0的节点,将其设为下一节点 13 for (Node t = tail; t != null && t != node; t = t.prev) 14 if (t.waitStatus <= 0) 15 s = t; 16 } 17 // 如果下一节点不为空,则使用LockSupport的unpark()方法唤醒下一节点中的线程 18 if (s != null) 19 LockSupport.unpark(s.thread); 20 }
头节点的下一节点为线程B所在的节点,线程B被唤醒。
5.5 线程B执行并结束
5.5.1 抢占资源
线程B在parkAndCheckInterrupt()方法中被释放后,返回中断状态为false,重新进入自旋:
1 final boolean acquireQueued(final Node node, int arg) { 2 // 记录当前节点是否取消,默认为true,表示取消 3 boolean failed = true; 4 try { 5 // 标记当前节点是否中断,默认为false,表示当前节点没有中断 6 boolean interrupted = false 7 // 自旋 8 for (;;) { 9 // 获取当前节点的上一节点 10 final Node p = node.predecessor(); 11 // 如果当前节点是头节点,表示当前节点即将被唤醒,尝试抢占资源 12 if (p == head && tryAcquire(arg)) { 13 // 将当前节点设为头节点,置空当前节点的上一节点,并取消当前节点同当前线程的绑定 14 setHead(node); 15 // 将原头节点的下一节点置空,方便GC回收 16 p.next = null; // help GC 17 // 标记当前节点为false,表示没有取消 18 failed = false; 19 // 返回false,表示当前节点没有中断 20 return interrupted; 21 } 22 // 不管当前节点是不是头节点,执行到这里就表示获取资源失败,处理前置节点并阻塞当前节点 23 if (shouldParkAfterFailedAcquire(p, node) && 24 parkAndCheckInterrupt()) 25 // 标记为true,表示当前节点中断 26 interrupted = true; 27 } 28 } finally { 29 // 当前节点如果被取消,执行取消操作 30 if (failed) 31 cancelAcquire(node); 32 } 33 }
因为线程B的上一节点为头节点,进入tryAcquire()方法抢占资源,抢占成功返回true并将当前节点设为头节点,同时解除同线程B的绑定。
5.5.2 解锁资源
线程B执行完毕,调用unlock()方法释放资源并唤醒线程。
头节点的下一节点为线程C所在的节点,线程C被唤醒。
5.6 线程C执行并结束
5.6.1 抢占资源
线程C在parkAndCheckInterrupt()方法中被释放后,返回中断状态为false,重新进入自旋。
因为线程C的上一节点为头节点,进入tryAcquire()方法抢占资源,抢占成功返回true并将当前节点设为头节点,同时解除同线程C的绑定。
5.6.2 解锁资源
线程C执行完毕,调用unlock()方法释放资源并唤醒线程。
头节点的下一节点为空,不会有任何线程被唤醒。
6 公平锁与非公平锁
6.1 非公平锁
非公平锁的线程在获取资源时,会尝试获取资源,如果成功则立刻占用资源,如果失败则尝试占用资源。
在资源可用时不会判断当前队列是否有线程在等待,也就是说刚加入的线程可以同唤醒的线程竞争资源。
1 final void lock() { 2 if (compareAndSetState(0, 1)) 3 setExclusiveOwnerThread(Thread.currentThread()); 4 else 5 acquire(1); 6 } 7 ... 8 public final void acquire(int arg) { 9 if (!tryAcquire(arg) && 10 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 11 selfInterrupt(); 12 } 13 ... 14 protected final boolean tryAcquire(int acquires) { 15 return nonfairTryAcquire(acquires); 16 } 17 ... 18 final boolean nonfairTryAcquire(int acquires) { 19 final Thread current = Thread.currentThread(); 20 int c = getState(); 21 if (c == 0) { 22 if (compareAndSetState(0, acquires)) { 23 setExclusiveOwnerThread(current); 24 return true; 25 } 26 } 27 else if (current == getExclusiveOwnerThread()) { 28 int nextc = c + acquires; 29 if (nextc < 0) // overflow 30 throw new Error("Maximum lock count exceeded"); 31 setState(nextc); 32 return true; 33 } 34 return false; 35 }
6.2 公平锁
公平锁的线程在获取资源时,不会尝试获取资源,而是尝试占用资源。
在资源可用时会判断当前队列是否有线程在等待,刚加入的线程不可用同唤醒的线程竞争资源。
1 final void lock() { 2 acquire(1); 3 } 4 ... 5 public final void acquire(int arg) { 6 if (!tryAcquire(arg) && 7 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 8 selfInterrupt(); 9 } 10 ... 11 protected final boolean tryAcquire(int acquires) { 12 final Thread current = Thread.currentThread(); 13 int c = getState(); 14 if (c == 0) { 15 if (!hasQueuedPredecessors() && 16 compareAndSetState(0, acquires)) { 17 setExclusiveOwnerThread(current); 18 return true; 19 } 20 } 21 else if (current == getExclusiveOwnerThread()) { 22 int nextc = c + acquires; 23 if (nextc < 0) 24 throw new Error("Maximum lock count exceeded"); 25 setState(nextc); 26 return true; 27 } 28 return false; 29 }
6.3 分析hasQueuedPredecessors()方法
比较NonfairSync和FairSync中tryAcquire()方法的实现,发现FairSync中的tryAcquire()方法中多了一项判断:
1 !hasQueuedPredecessors()
hasQueuedPredecessors()方法用于公平锁加锁时判断等待队列中是否存在有效节点的方法。该方法用于判断当前节点前是否有其他节点排队:
返回false表示没有,取反后为true表示当前节点不需要排队,需要继续执行占用资源的操作。
返回true表示有,取反后为false表示当前节点需要排队,需要执行加入等待队列的操作。
查看AQS中定义的hasQueuedPredecessors()方法:
1 public final boolean hasQueuedPredecessors() { 2 // The correctness of this depends on head being initialized 3 // before tail and on head.next being accurate if the current 4 // thread is first in queue. 5 Node t = tail; // Read fields in reverse initialization order 6 Node h = head; 7 Node s; 8 return h != t && 9 ((s = h.next) == null || s.thread != Thread.currentThread()); 10 }
判断h是否不等于t,如果不成立,说明h等于t,说明头节点和尾节点相同,说明当前队列未初始化(头节点和尾节点都是空节点)或者当前队列只有一个节点(头结点和尾节点都是空节点),说明不需要排队,返回false,取反后为true,尝试占用资源。
判断h是否不等于t,如果成立,说明h不等于t,说明存在两个不同节点。继续判断头节点的下一节点是否为空节点,如果成立,说明下一节点为空,可能上个线程在执行初始化enq()方法,刚刚通过CAS操作compareAndSetHead()将头节点初始化,尚未给尾节点赋值,此时头节点不为空,尾节点为空,并且头节点的下一节点为空,返回true,取反后为false,需要排队。
判断h是否不等于t,如果成立,说明h不等于t,说明存在两个不同节点。继续判断头节点的下一节点是否为空节点,如果不成立,说明下一节点不为空。继续判断下一节点封装的线程是否不等于当前线程,如果成立,说明下一线程不为当前线程,返回true,取反后为false,需要排队。
判断h是否不等于t,如果成立,说明h不等于t,说明存在两个不同节点。继续判断头节点的下一节点是否为空节点,如果不成立,说明下一节点不为空。继续判断下一节点封装的线程是否不等于当前线程,如果不成立,说明下一线程为当前线程,返回false,取反后为true,尝试占用资源。
7 自定义同步器
7.1 实现方法
不同的自定义同步器争用共享资源的方式也不同,自定义同步器在实现时只需要实现共享资源state的获取与释放即可,至于具体线程等待队列的维护(如获取资源失败入队和唤醒出队等),AQS已经在底层实现好了。
自定义同步器实现时主要实现以下几种方法:
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
7.2 举例说明
7.2.1 ReentrantLock
以ReentrantLock为例,state初始化为0,表示未锁定状态。
当线程A调用lock()方法获取资源时,会调用tryAcquire()占用资源,并将state的值加1。
此后,其他线程再tryAcquire()时就会失败,直到线程A调用unlock()方法释放资源,并将state的值减0,其它线程才有机会获取该锁。
当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证state是能回到零态的。
7.2.2 CountDownLatch
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(N与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后都会调用一次countDown()方法,使用CAS操作将state值减1。等到所有子线程都执行完后,state的值变为0,这时会调用unpark()方法唤醒主线程,然后主线程就会从await()方法唤醒,继续后余动作。
标签:012AQS,Node,节点,int,011Java,线程,发包,final,资源 来源: https://www.cnblogs.com/zhibiweilai/p/15381329.html