JDK15 —— AQS 源码
作者:互联网
使用方法
通用 API
-
独占式获取
accquire
acquireInterruptibly
tryAcquireNanos
-
共享式获取
acquireShared
acquireSharedInterruptibly
tryAcquireSharedNanos
-
独占式释放锁
release
-
共享式释放锁
releaseShared
需要子类覆盖的流程方法
-
独占式获取
tryAcquire
-
共享式获取
tryAcquireShared
-
独占式释放
tryRelease
-
共享式释放
tryReleaseShared
-
这个同步器是否处于独占模式
isHeldExclusively
源码
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/***************************** 通用 API ******************************/
// 获取锁,都是调用的 acquire 方法
// 独占式获取
public final void acquire(int arg) {
if (!tryAcquire(arg)) acquire(null, arg, false, false, false, 0L);
}
// 独占式获取,可中断
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted() ||
(!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0))
throw new InterruptedException();
}
// 独占式获取,可超时
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (!Thread.interrupted()) {
if (tryAcquire(arg))
return true;
if (nanosTimeout <= 0L)
return false;
int stat = acquire(null, arg, false, true, true, System.nanoTime() + nanosTimeout);
if (stat > 0)
return true;
if (stat == 0)
return false;
}
throw new InterruptedException();
}
// 共享式获取
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) acquire(null, arg, true, false, false, 0L);
}
// 共享式获取,可中断
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted() ||
(tryAcquireShared(arg) < 0 && acquire(null, arg, true, true, false, 0L) < 0))
throw new InterruptedException();
}
// 共享式获取,可超时
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
if (!Thread.interrupted()) {
if (tryAcquireShared(arg) >= 0)
return true;
if (nanosTimeout <= 0L)
return false;
int stat = acquire(null, arg, true, true, true, System.nanoTime() + nanosTimeout);
if (stat > 0)
return true;
if (stat == 0)
return false;
}
throw new InterruptedException();
}
// 独占式释放
public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}
// 共享式释放
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
signalNext(head);
return true;
}
return false;
}
/*****************************需要子类覆盖的流程方法******************************/
// 全是 UnsupportedOperationException 异常
// 独占式获取
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
// 共享式获取
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
// 独占式释放
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
// 共享式释放
protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
// 是否处于独占模式
protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
}
数据结构
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient volatile Node head; // 等待队列头节点,初始化懒加载
private transient volatile Node tail; // 等待队列尾节点,初始化后只能通过 casTail 修改
private volatile int state;
// Unsafe
private static final Unsafe U = Unsafe.getUnsafe();
private static final long STATE = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "state");
private static final long HEAD = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "head");
private static final long TAIL = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "tail");
private boolean casTail(Node c, Node v) { return U.compareAndSetReference(this, TAIL, c, v); }
protected final int getState() { return state; }
protected final void setState(int newState) { state = newState; }
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSetInt(this, STATE, expect, update);
}
// Node 节点状态位
static final int WAITING = 1; // must be 1
static final int CANCELLED = 0x80000000; // must be negative
static final int COND = 2; // in a condition wait
// 节点的数据结构
abstract static class Node {
volatile Node prev;
volatile Node next;
Thread waiter;
volatile int status;
// methods for atomic operations
// for cleanQueue
final boolean casPrev(Node c, Node v) { return U.weakCompareAndSetReference(this, PREV, c, v); }
// for cleanQueue
final boolean casNext(Node c, Node v) { return U.weakCompareAndSetReference(this, NEXT, c, v); }
// for signalling
final int getAndUnsetStatus(int v) { return U.getAndBitwiseAndInt(this, STATUS, ~v); }
// for off-queue assignment
final void setPrevRelaxed(Node p) { U.putReference(this, PREV, p); }
// for off-queue assignment
final void setStatusRelaxed(int s) { U.putInt(this, STATUS, s); }
// for reducing unneeded signals
final void clearStatus() { U.putIntOpaque(this, STATUS, 0); }
private static final long STATUS = U.objectFieldOffset(Node.class, "status");
private static final long NEXT = U.objectFieldOffset(Node.class, "next");
private static final long PREV = U.objectFieldOffset(Node.class, "prev");
}
// 独占节点
static final class ExclusiveNode extends Node { }
// 共享节点
static final class SharedNode extends Node { }
// 条件队列的节点
static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {
ConditionNode nextWaiter;
public final boolean isReleasable() {
return status <= 1 || Thread.currentThread().isInterrupted();
}
public final boolean block() {
while (!isReleasable()) LockSupport.park();
return true;
}
}
}
获取锁——acquire
(公共方法)
final int acquire(Node node, int arg, boolean shared, boolean interruptible,
boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued
/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if node not yet created, create it
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/
// 自旋中拿锁,失败就 cancel
for (;;) {
if (!first && // 当前是第一个节点的话,就没要执行这个 if 了
(pred = (node == null) ? null : node.prev) != null && // node 节点是否已经在队列里了
!(first = (head == pred))) { // 判断当前节点是不是队列中的第一个节点
if (pred.status < 0) { // 代表已经取消了的节点
cleanQueue(); // predecessor cancelled,清理队列中已经取消了的节点,清理完 continue
continue;
} else if (pred.prev == null) { // 刚刚不是第一个节点,现在变成第一个节点了,continue
Thread.onSpinWait(); // ensure serialization
continue;
}
}
// 如果是第一个节点或该节点尚未入队(为了实现非公平锁和共享锁),则尝试拿锁
if (first || pred == null) { // 尝试拿锁
boolean acquired;
try {
if (shared) acquired = (tryAcquireShared(arg) >= 0);
else acquired = tryAcquire(arg);
} catch (Throwable ex) {
// 抛出异常,直接 cancel
cancelAcquire(node, interrupted, false);
throw ex;
}
// 如果拿锁成功
if (acquired) {
// 如果是队列中第一个节点,则需要把 head 指向当前节点,
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared) signalNextIfShared(node); // 通知下一个共享节点
if (interrupted) current.interrupt(); // 已经拿到锁了,中断,就直接中断线程
}
return 1;
}
}
// node 为 null,表示要往队列中添加新节点,这里只是给 node new 一个节点,然后自旋
if (node == null) { // allocate; retry before enqueue
if (shared) node = new SharedNode();
else node = new ExclusiveNode();
}
// 这里表示第一次往队列中添加节点
else if (pred == null) { // try to enqueue
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
// 如果队列还未初始化,则先初始化,就是 new 一个空的独占节点,让 head 和 tail 指向它
// 自旋时 prev 依然是 null
if (t == null) tryInitializeHead();
// casTail(t, node) 把 node 设置成尾节点
// 注意前面有个 ! ,说明设置成功后会进入下一个 else 块,即 t.next = node;
// CAS 设置失败会进入 if 块,重新把 prev 设为 null,自旋
else if (!casTail(t, node)) node.setPrevRelaxed(null); // back out
else t.next = node;
} else if (first && spins != 0) { // 为了减少 park 的次数,使用自旋
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) { // 先把节点状态改为 WAITING 才能 park
node.status = WAITING; // enable signal and recheck
} else { // 尝试拿锁失败,park
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed) LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else break; // 超时退出,cancel 掉
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break; // 产生中断,cancel 掉
}
}
// 有异常、超时或中断,cancel 掉
return cancelAcquire(node, interrupted, interruptible);
}
// 初始化队列
private void tryInitializeHead() {
Node h = new ExclusiveNode();
if (U.compareAndSetReference(this, HEAD, null, h))
tail = h;
}
// 通知下一个共享节点
private static void signalNextIfShared(Node h) {
Node s;
if (h != null && (s = h.next) != null && (s instanceof SharedNode) && s.status != 0) {
s.getAndUnsetStatus(WAITING); // 取消 WAITING 状态
LockSupport.unpark(s.waiter);
}
}
// 清理队列中 cancel 的节点
private void cleanQueue() {
for (;;) { // restart point
for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
if (q == null || (p = q.prev) == null)
return; // end of list
if (s == null ? tail != q : (s.prev != q || s.status < 0))
break; // inconsistent
if (q.status < 0) { // cancelled
if ((s == null ? casTail(q, p) : s.casPrev(q, p)) &&
q.prev == p) {
p.casNext(q, s); // OK if fails
if (p.prev == null)
signalNext(p);
}
break;
}
if ((n = p.next) != q) { // help finish
if (n != null && q.prev == p) {
p.casNext(n, q);
if (p.prev == null)
signalNext(p);
}
break;
}
s = q;
q = q.prev;
}
}
}
// cancel
private int cancelAcquire(Node node, boolean interrupted, boolean interruptible) {
if (node != null) {
node.waiter = null;
node.status = CANCELLED;
if (node.prev != null) cleanQueue();
}
if (interrupted) {
if (interruptible) return CANCELLED;
else Thread.currentThread().interrupt();
}
return 0;
}
释放锁——signalNext
(通知下一个节点)
public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
signalNext(head);
return true;
}
return false;
}
// 通知下一个节点
private static void signalNext(Node h) {
Node s;
if (h != null && (s = h.next) != null && s.status != 0) {
s.getAndUnsetStatus(WAITING); // 取消 WAITING 状态
LockSupport.unpark(s.waiter);
}
}
等待队列—— ConditionObject
使用方法——Condition
接口
- 等待
await()
awaitUninterruptibly()
:不可中断awaitNanos(long nanosTimeout)
:超时,纳秒单位await(long time, TimeUnit unit)
:超时,自定义时间单位awaitUntil(Date deadline)
:超时,一直等到什么时间
- 通知
signal()
signalAll()
数据结构
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
// 保存头节点和尾节点
private transient ConditionNode firstWaiter;
private transient ConditionNode lastWaiter;
public ConditionObject() { }
}
static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {
ConditionNode nextWaiter; // 链表,保存下一个节点
public final boolean isReleasable() {
return status <= 1 || Thread.currentThread().isInterrupted();
}
public final boolean block() {
while (!isReleasable()) LockSupport.park();
return true;
}
}
}
等待
await
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
// 使用 ForkJoinPool.managedBlock
public final void await() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
ConditionNode node = new ConditionNode();
int savedState = enableWait(node); // 等待
LockSupport.setCurrentBlocker(this); // for back-compatibility,暂停当前线程
boolean interrupted = false, cancelled = false;
while (!canReacquire(node)) { // 当前节点在同步队列中,则进入 while 循环
if (interrupted |= Thread.interrupted()) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break; // else interrupted after signal
} else if ((node.status & COND) != 0) {
try {
ForkJoinPool.managedBlock(node); // 阻塞
} catch (InterruptedException ie) {
interrupted = true;
}
} else Thread.onSpinWait(); // awoke while enqueuing
}
LockSupport.setCurrentBlocker(null); // 取消暂停当前线程
// 获取锁
node.clearStatus();
acquire(node, savedState, false, false, false, 0L);
if (interrupted) {
if (cancelled) {
unlinkCancelledWaiters(node);
throw new InterruptedException();
}
Thread.currentThread().interrupt();
}
}
// 等待方法
private int enableWait(ConditionNode node) {
if (isHeldExclusively()) { // 独占模式下才生效
node.waiter = Thread.currentThread();
node.setStatusRelaxed(COND | WAITING); // 条件等待
ConditionNode last = lastWaiter;
if (last == null) firstWaiter = node;
else last.nextWaiter = node;
lastWaiter = node;
int savedState = getState();
if (release(savedState))
return savedState;
}
// 共享锁就没有等待队列,抛出异常
node.status = CANCELLED; // lock not held or inconsistent
throw new IllegalMonitorStateException();
}
// 如果最初放置在条件队列中的节点现在准备在同步队列中重新获取,则返回 true。
private boolean canReacquire(ConditionNode node) {
// check links, not status to avoid enqueue race
return node != null && node.prev != null && isEnqueued(node);
}
}
// 如果从尾部遍历找到了节点,则返回 true
final boolean isEnqueued(Node node) {
for (Node t = tail; t != null; t = t.prev)
if (t == node) return true;
return false;
}
}
awaitNanos
// 使用 parkNanos
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
ConditionNode node = new ConditionNode();
int savedState = enableWait(node);
long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
long deadline = System.nanoTime() + nanos;
boolean cancelled = false, interrupted = false;
while (!canReacquire(node)) {
if ((interrupted |= Thread.interrupted()) ||
(nanos = deadline - System.nanoTime()) <= 0L) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break;
} else LockSupport.parkNanos(this, nanos);
}
node.clearStatus();
acquire(node, savedState, false, false, false, 0L);
if (cancelled) {
unlinkCancelledWaiters(node);
if (interrupted) throw new InterruptedException();
} else if (interrupted) Thread.currentThread().interrupt();
long remaining = deadline - System.nanoTime(); // avoid overflow
return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE;
}
awaitUntil
// 使用 parkUntil
public final boolean awaitUntil(Date deadline) throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted()) throw new InterruptedException();
ConditionNode node = new ConditionNode();
int savedState = enableWait(node);
boolean cancelled = false, interrupted = false;
while (!canReacquire(node)) {
if ((interrupted |= Thread.interrupted()) ||
System.currentTimeMillis() >= abstime) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break;
} else LockSupport.parkUntil(this, abstime);
}
node.clearStatus();
acquire(node, savedState, false, false, false, 0L);
if (cancelled) {
unlinkCancelledWaiters(node);
if (interrupted) throw new InterruptedException();
} else if (interrupted) Thread.currentThread().interrupt();
return !cancelled;
}
await(long,TimeUnit)
// 使用 parkNanos
public final boolean await(long time, TimeUnit unit) throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted()) throw new InterruptedException();
ConditionNode node = new ConditionNode();
int savedState = enableWait(node);
long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
long deadline = System.nanoTime() + nanos;
boolean cancelled = false, interrupted = false;
while (!canReacquire(node)) {
if ((interrupted |= Thread.interrupted()) ||
(nanos = deadline - System.nanoTime()) <= 0L) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break;
} else LockSupport.parkNanos(this, nanos);
}
node.clearStatus();
acquire(node, savedState, false, false, false, 0L);
if (cancelled) {
unlinkCancelledWaiters(node);
if (interrupted) throw new InterruptedException();
} else if (interrupted) Thread.currentThread().interrupt();
return !cancelled;
}
signal
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
public final void signal() {
ConditionNode first = firstWaiter;
if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 非独占就抛异常
if (first != null) doSignal(first, false);
}
public final void signalAll() {
ConditionNode first = firstWaiter;
if (!isHeldExclusively()) throw new IllegalMonitorStateException();
if (first != null) doSignal(first, true); // all = true
}
private void doSignal(ConditionNode first, boolean all) {
while (first != null) {
ConditionNode next = first.nextWaiter;
if ((firstWaiter = next) == null)
lastWaiter = null;
if ((first.getAndUnsetStatus(COND) & COND) != 0) {
enqueue(first); // 入队
if (!all) break; // 只通知一个
}
first = next;
}
}
}
// 节点入队
final void enqueue(Node node) {
if (node != null) {
for (;;) {
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null) // initialize
tryInitializeHead();
else if (casTail(t, node)) {
t.next = node;
if (t.status < 0) // wake up to clean link
LockSupport.unpark(node.waiter);
break;
}
}
}
}
}
自定义锁
- 内部静态类
Sync
继承AQS
,实现自定义功能- 独占式
- 共享式
- 实现
Lock
接口
显示锁要在 finally
块中释放锁。
标签:node,Node,AQS,int,JDK15,interrupted,源码,null,final 来源: https://www.cnblogs.com/xch-jiang/p/14615138.html