编程语言
首页 > 编程语言> > JDK15 —— AQS 源码

JDK15 —— AQS 源码

作者:互联网

使用方法

通用 API

需要子类覆盖的流程方法

源码

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    	                                         implements java.io.Serializable {
    /***************************** 通用 API ******************************/
    // 获取锁,都是调用的 acquire 方法
    // 独占式获取
	public final void acquire(int arg) {
    	if (!tryAcquire(arg)) acquire(null, arg, false, false, false, 0L);
	}
    // 独占式获取,可中断
	public final void acquireInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted() ||
            (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0))
			throw new InterruptedException();
    }
    // 独占式获取,可超时
	public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
        if (!Thread.interrupted()) {
            if (tryAcquire(arg))
                return true;
            if (nanosTimeout <= 0L)
                return false;
            int stat = acquire(null, arg, false, true, true, System.nanoTime() + nanosTimeout);
            if (stat > 0)
                return true;
            if (stat == 0)
                return false;
        }
        throw new InterruptedException();
    }
    // 共享式获取
	public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0) acquire(null, arg, true, false, false, 0L);
    }
    // 共享式获取,可中断
    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted() ||
            (tryAcquireShared(arg) < 0 && acquire(null, arg, true, true, false, 0L) < 0))
            throw new InterruptedException();
    }    
    // 共享式获取,可超时
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
        if (!Thread.interrupted()) {
            if (tryAcquireShared(arg) >= 0)
                return true;
            if (nanosTimeout <= 0L)
                return false;
            int stat = acquire(null, arg, true, true, true, System.nanoTime() + nanosTimeout);
            if (stat > 0)
                return true;
            if (stat == 0)
                return false;
        }
        throw new InterruptedException();
    }    
    // 独占式释放
	public final boolean release(int arg) {
        if (tryRelease(arg)) {
            signalNext(head);
            return true;
        }
        return false;
    }
    // 共享式释放
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            signalNext(head);
            return true;
        }
        return false;
    }
    
    /*****************************需要子类覆盖的流程方法******************************/
    // 全是 UnsupportedOperationException 异常
    // 独占式获取
    protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
    // 共享式获取
    protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
    // 独占式释放
    protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
    // 共享式释放
    protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
    // 是否处于独占模式
    protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
}

数据结构

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    	                                         implements java.io.Serializable {
    private transient volatile Node head;   // 等待队列头节点,初始化懒加载
    private transient volatile Node tail;   // 等待队列尾节点,初始化后只能通过 casTail 修改
    private volatile int state;
    
    // Unsafe
    private static final Unsafe U = Unsafe.getUnsafe();
    private static final long STATE = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "state");
    private static final long HEAD = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "head");
    private static final long TAIL = U.objectFieldOffset(AbstractQueuedSynchronizer.class, "tail");
	
    private boolean casTail(Node c, Node v) { return U.compareAndSetReference(this, TAIL, c, v); }
    protected final int getState() { return state; }
    protected final void setState(int newState) { state = newState; }
    protected final boolean compareAndSetState(int expect, int update) {
        return U.compareAndSetInt(this, STATE, expect, update);
    }  
    
    // Node 节点状态位
    static final int WAITING   = 1;          // must be 1
    static final int CANCELLED = 0x80000000; // must be negative
    static final int COND      = 2;          // in a condition wait
    
    // 节点的数据结构
    abstract static class Node {
	volatile Node prev;       
	volatile Node next;      
	Thread waiter;           
	volatile int status;
        // methods for atomic operations
        // for cleanQueue
        final boolean casPrev(Node c, Node v) { return U.weakCompareAndSetReference(this, PREV, c, v); }
        // for cleanQueue
        final boolean casNext(Node c, Node v) { return U.weakCompareAndSetReference(this, NEXT, c, v); }
        // for signalling
        final int getAndUnsetStatus(int v) { return U.getAndBitwiseAndInt(this, STATUS, ~v); }
        // for off-queue assignment
        final void setPrevRelaxed(Node p) { U.putReference(this, PREV, p); }
        // for off-queue assignment
        final void setStatusRelaxed(int s) { U.putInt(this, STATUS, s); }
        // for reducing unneeded signals
        final void clearStatus() { U.putIntOpaque(this, STATUS, 0); }

        private static final long STATUS = U.objectFieldOffset(Node.class, "status");
        private static final long NEXT = U.objectFieldOffset(Node.class, "next");
        private static final long PREV = U.objectFieldOffset(Node.class, "prev");
	}
	// 独占节点
	static final class ExclusiveNode extends Node { }
	// 共享节点
	static final class SharedNode extends Node { }
	// 条件队列的节点
	static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {
    	        ConditionNode nextWaiter;   
		public final boolean isReleasable() {
			return status <= 1 || Thread.currentThread().isInterrupted();
		}
		public final boolean block() {
			while (!isReleasable()) LockSupport.park();
			return true;
		}
	}
}

获取锁——acquire (公共方法)

final int acquire(Node node, int arg, boolean shared, boolean interruptible, 
                  boolean timed, long time) {
    Thread current = Thread.currentThread();
    byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
    boolean interrupted = false, first = false;
    Node pred = null;                // predecessor of node when enqueued

    /*
     * Repeatedly:
     *  Check if node now first
     *    if so, ensure head stable, else ensure valid predecessor
     *  if node is first or not yet enqueued, try acquiring
     *  else if node not yet created, create it
     *  else if not yet enqueued, try once to enqueue
     *  else if woken from park, retry (up to postSpins times)
     *  else if WAITING status not set, set and retry
     *  else park and clear WAITING status, and check cancellation
     */
	// 自旋中拿锁,失败就 cancel
    for (;;) {
        if (!first &&    // 当前是第一个节点的话,就没要执行这个 if 了
            (pred = (node == null) ? null : node.prev) != null &&  //  node 节点是否已经在队列里了
            !(first = (head == pred))) {  // 判断当前节点是不是队列中的第一个节点
            if (pred.status < 0) { // 代表已经取消了的节点
                cleanQueue();           // predecessor cancelled,清理队列中已经取消了的节点,清理完 continue
                continue;
            } else if (pred.prev == null) {  // 刚刚不是第一个节点,现在变成第一个节点了,continue 
                Thread.onSpinWait();    // ensure serialization
                continue;
            }
        }
        
        // 如果是第一个节点或该节点尚未入队(为了实现非公平锁和共享锁),则尝试拿锁
        if (first || pred == null) {  // 尝试拿锁
            boolean acquired;
            try {
                if (shared) acquired = (tryAcquireShared(arg) >= 0);
                else acquired = tryAcquire(arg);
            } catch (Throwable ex) {
                // 抛出异常,直接 cancel
                cancelAcquire(node, interrupted, false);
                throw ex;
            }
            // 如果拿锁成功
            if (acquired) {
                // 如果是队列中第一个节点,则需要把 head 指向当前节点,
                if (first) {
                    node.prev = null;
                    head = node;
                    pred.next = null;
                    node.waiter = null;
                    if (shared) signalNextIfShared(node); // 通知下一个共享节点
                    if (interrupted) current.interrupt(); // 已经拿到锁了,中断,就直接中断线程
                }
                return 1;
            }
        }
        
        // node 为 null,表示要往队列中添加新节点,这里只是给 node new 一个节点,然后自旋
        if (node == null) {                 // allocate; retry before enqueue
            if (shared) node = new SharedNode();
            else node = new ExclusiveNode();
        } 
        // 这里表示第一次往队列中添加节点
        else if (pred == null) {          // try to enqueue
            node.waiter = current;
            Node t = tail;
            node.setPrevRelaxed(t);         // avoid unnecessary fence
            // 如果队列还未初始化,则先初始化,就是 new 一个空的独占节点,让 head 和 tail 指向它
            // 自旋时 prev 依然是 null
            if (t == null) tryInitializeHead(); 
            // casTail(t, node) 把 node 设置成尾节点
            // 注意前面有个 ! ,说明设置成功后会进入下一个 else 块,即 t.next = node;
            // CAS 设置失败会进入 if 块,重新把 prev 设为 null,自旋
            else if (!casTail(t, node)) node.setPrevRelaxed(null);  // back out
            else t.next = node;
        } else if (first && spins != 0) {   // 为了减少 park 的次数,使用自旋
            --spins;                        // reduce unfairness on rewaits
            Thread.onSpinWait();
        } else if (node.status == 0) {      // 先把节点状态改为 WAITING 才能 park
            node.status = WAITING;          // enable signal and recheck
        } else {                            // 尝试拿锁失败,park
            long nanos;
            spins = postSpins = (byte)((postSpins << 1) | 1);
            if (!timed) LockSupport.park(this);
            else if ((nanos = time - System.nanoTime()) > 0L) 
                LockSupport.parkNanos(this, nanos);
            else break; // 超时退出,cancel 掉
            node.clearStatus();
            if ((interrupted |= Thread.interrupted()) && interruptible)
                break; // 产生中断,cancel 掉
        }
    }
    // 有异常、超时或中断,cancel 掉
    return cancelAcquire(node, interrupted, interruptible);
}

	// 初始化队列
    private void tryInitializeHead() {
        Node h = new ExclusiveNode();
        if (U.compareAndSetReference(this, HEAD, null, h))
            tail = h;
    }
	// 通知下一个共享节点
    private static void signalNextIfShared(Node h) {
        Node s;
        if (h != null && (s = h.next) != null && (s instanceof SharedNode) && s.status != 0) {
            s.getAndUnsetStatus(WAITING);  // 取消 WAITING 状态
            LockSupport.unpark(s.waiter);
        }
    }
	// 清理队列中 cancel 的节点
    private void cleanQueue() {
        for (;;) {                               // restart point
            for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
                if (q == null || (p = q.prev) == null)
                    return;                      // end of list
                if (s == null ? tail != q : (s.prev != q || s.status < 0))
                    break;                       // inconsistent
                if (q.status < 0) {              // cancelled
                    if ((s == null ? casTail(q, p) : s.casPrev(q, p)) &&
                        q.prev == p) {
                        p.casNext(q, s);         // OK if fails
                        if (p.prev == null)
                            signalNext(p);
                    }
                    break;
                }
                if ((n = p.next) != q) {         // help finish
                    if (n != null && q.prev == p) {
                        p.casNext(n, q);
                        if (p.prev == null)
                            signalNext(p);
                    }
                    break;
                }
                s = q;
                q = q.prev;
            }
        }
    }
	// cancel
    private int cancelAcquire(Node node, boolean interrupted, boolean interruptible) {
        if (node != null) {
            node.waiter = null;
            node.status = CANCELLED;
            if (node.prev != null) cleanQueue();
        }
        if (interrupted) {
            if (interruptible) return CANCELLED;
            else Thread.currentThread().interrupt();
        }
        return 0;
    }

释放锁——signalNext(通知下一个节点)

    public final boolean release(int arg) {
    	if (tryRelease(arg)) {
        	signalNext(head); 
        	return true;
    	}
    	return false;
	}
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            signalNext(head);
            return true;
        }
        return false;
    }
	// 通知下一个节点
    private static void signalNext(Node h) {
        Node s;
        if (h != null && (s = h.next) != null && s.status != 0) {
            s.getAndUnsetStatus(WAITING); // 取消 WAITING 状态
            LockSupport.unpark(s.waiter);
        }
    }

等待队列—— ConditionObject

使用方法——Condition 接口

数据结构

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    	                                         implements java.io.Serializable {
	public class ConditionObject implements Condition, java.io.Serializable {
    	// 保存头节点和尾节点
		private transient ConditionNode firstWaiter;
   		private transient ConditionNode lastWaiter;
    	public ConditionObject() { }
	}

	static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {
    	ConditionNode nextWaiter;   // 链表,保存下一个节点
		public final boolean isReleasable() {
			return status <= 1 || Thread.currentThread().isInterrupted();
		}
		public final boolean block() {
			while (!isReleasable()) LockSupport.park();
			return true;
		}
	}
}

等待

await

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    	                                         implements java.io.Serializable {
	public class ConditionObject implements Condition, java.io.Serializable {
        // 使用 ForkJoinPool.managedBlock
        public final void await() throws InterruptedException {
            if (Thread.interrupted()) throw new InterruptedException();
            ConditionNode node = new ConditionNode();
            int savedState = enableWait(node);   // 等待
            LockSupport.setCurrentBlocker(this); // for back-compatibility,暂停当前线程
            boolean interrupted = false, cancelled = false;
            while (!canReacquire(node)) { // 当前节点在同步队列中,则进入 while 循环
                if (interrupted |= Thread.interrupted()) {
                    if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                        break;              // else interrupted after signal
                } else if ((node.status & COND) != 0) {
                    try {
                        ForkJoinPool.managedBlock(node); // 阻塞
                    } catch (InterruptedException ie) {
                        interrupted = true;
                    }
                } else Thread.onSpinWait();    // awoke while enqueuing
            }
            LockSupport.setCurrentBlocker(null); // 取消暂停当前线程
            // 获取锁
            node.clearStatus();
            acquire(node, savedState, false, false, false, 0L);
            if (interrupted) {
                if (cancelled) {
                    unlinkCancelledWaiters(node);
                    throw new InterruptedException();
                }
                Thread.currentThread().interrupt();
            }
        }
		// 等待方法
        private int enableWait(ConditionNode node) {
            if (isHeldExclusively()) { // 独占模式下才生效
                node.waiter = Thread.currentThread();
                node.setStatusRelaxed(COND | WAITING); // 条件等待
                ConditionNode last = lastWaiter;
                if (last == null) firstWaiter = node;
                else last.nextWaiter = node;
                lastWaiter = node;
                int savedState = getState();
                if (release(savedState))
                    return savedState;
            }
            // 共享锁就没有等待队列,抛出异常
            node.status = CANCELLED; // lock not held or inconsistent
            throw new IllegalMonitorStateException();
        }
        
		// 如果最初放置在条件队列中的节点现在准备在同步队列中重新获取,则返回 true。
        private boolean canReacquire(ConditionNode node) {
            // check links, not status to avoid enqueue race
            return node != null && node.prev != null && isEnqueued(node);
        }
    }
    
	// 如果从尾部遍历找到了节点,则返回 true
	final boolean isEnqueued(Node node) {
        for (Node t = tail; t != null; t = t.prev)
            if (t == node) return true;
        return false;
    }
}

awaitNanos

// 使用 parkNanos
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted()) throw new InterruptedException();
    ConditionNode node = new ConditionNode();
    int savedState = enableWait(node);
    long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
    long deadline = System.nanoTime() + nanos;
    boolean cancelled = false, interrupted = false;
    while (!canReacquire(node)) {
        if ((interrupted |= Thread.interrupted()) ||
            (nanos = deadline - System.nanoTime()) <= 0L) {
            if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                break;
        } else LockSupport.parkNanos(this, nanos);
    }
    node.clearStatus();
    acquire(node, savedState, false, false, false, 0L);
    if (cancelled) {
        unlinkCancelledWaiters(node);
        if (interrupted) throw new InterruptedException();
    } else if (interrupted) Thread.currentThread().interrupt();
    long remaining = deadline - System.nanoTime(); // avoid overflow
    return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE;
}

awaitUntil

// 使用 parkUntil
public final boolean awaitUntil(Date deadline) throws InterruptedException {
    long abstime = deadline.getTime();
    if (Thread.interrupted()) throw new InterruptedException();
    ConditionNode node = new ConditionNode();
    int savedState = enableWait(node);
    boolean cancelled = false, interrupted = false;
    while (!canReacquire(node)) {
        if ((interrupted |= Thread.interrupted()) ||
            System.currentTimeMillis() >= abstime) {
            if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                break;
        } else LockSupport.parkUntil(this, abstime);
    }
    node.clearStatus();
    acquire(node, savedState, false, false, false, 0L);
    if (cancelled) {
        unlinkCancelledWaiters(node);
        if (interrupted) throw new InterruptedException();
    } else if (interrupted) Thread.currentThread().interrupt();
    return !cancelled;
}

await(long,TimeUnit)

// 使用 parkNanos
public final boolean await(long time, TimeUnit unit) throws InterruptedException {
    long nanosTimeout = unit.toNanos(time);
    if (Thread.interrupted()) throw new InterruptedException();
    ConditionNode node = new ConditionNode();
    int savedState = enableWait(node);
    long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
    long deadline = System.nanoTime() + nanos;
    boolean cancelled = false, interrupted = false;
    while (!canReacquire(node)) {
        if ((interrupted |= Thread.interrupted()) ||
            (nanos = deadline - System.nanoTime()) <= 0L) {
            if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                break;
        } else LockSupport.parkNanos(this, nanos);
    }
    node.clearStatus();
    acquire(node, savedState, false, false, false, 0L);
    if (cancelled) {
        unlinkCancelledWaiters(node);
        if (interrupted) throw new InterruptedException();
    } else if (interrupted) Thread.currentThread().interrupt();
    return !cancelled;
}

signal

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    	                                         implements java.io.Serializable {
	public class ConditionObject implements Condition, java.io.Serializable {
		public final void signal() {
            ConditionNode first = firstWaiter;
            if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 非独占就抛异常 
            if (first != null) doSignal(first, false);
        }
		public final void signalAll() {
            ConditionNode first = firstWaiter;
            if (!isHeldExclusively())  throw new IllegalMonitorStateException();
            if (first != null) doSignal(first, true); // all = true
        }

        private void doSignal(ConditionNode first, boolean all) {
            while (first != null) {
                ConditionNode next = first.nextWaiter;
                if ((firstWaiter = next) == null)
                    lastWaiter = null;
                if ((first.getAndUnsetStatus(COND) & COND) != 0) {
                    enqueue(first);  // 入队
                    if (!all) break; // 只通知一个
                }
                first = next;
            }
        }
    }
	
    // 节点入队
    final void enqueue(Node node) {
        if (node != null) {
            for (;;) {
                Node t = tail;
                node.setPrevRelaxed(t);        // avoid unnecessary fence
                if (t == null)                 // initialize
                    tryInitializeHead();
                else if (casTail(t, node)) {
                    t.next = node;
                    if (t.status < 0)          // wake up to clean link
                        LockSupport.unpark(node.waiter);
                    break;
                }
            }
        }
    }
}

自定义锁

  1. 内部静态类 Sync 继承 AQS ,实现自定义功能
    1. 独占式
    2. 共享式
  2. 实现 Lock 接口

显示锁要在 finally 块中释放锁。

标签:node,Node,AQS,int,JDK15,interrupted,源码,null,final
来源: https://www.cnblogs.com/xch-jiang/p/14615138.html