其他分享
首页 > 其他分享> > JUC-CAS和AQS

JUC-CAS和AQS

作者:互联网

一、CAS

  CAS, compare and swap比较并替换。 CAS有三个参数:需要读写的内存位值(V)、进行比较的预期原值(A)和拟写入的新值(B)。当且仅当V的值等于A时,CAS才会通过原子方式用新值B来更新V的值,否则不会执行任何操作。程序在在某个变更新时,会有一个校验逻辑——认为V的值应该是A,如果是,那么将V的值更新为B,否则不修改并告诉V的值实际为多少。CAS是一项乐观的技术,它希望能成功地执行更新操作,并且如果有另一个线程在最近一次检查后更新了该变量,那么CAS能检测到这个错误。当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其他线程都将失败。但是,失败的线程并不会被挂起,而是被告知在这次竞争中失败,并可以多次尝试。这种灵活性可以减少与锁相关的活跃性风险,例如死锁,饥饿等问题。

AtomicInteger源码分析:

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    //是使用了Unsafe类来进行CAS操作,valueOffset表示的是value值的偏移地址,Unsafe会根据内存偏移地址获取数据的原值的, 
    //偏移量对应指针指向该变量的内存地址。
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            //内存偏移地址在静态代码块中通过Unsafe的objectFieldOffset方法获取。
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    
    //使用volatile修饰,直接从共享内存中操作变量,保证多线程之间看到的value值是同一份
    private volatile int value;

    /**
     * Creates a new AtomicInteger with the given initial value.
     *
     * @param initialValue the initial value
     */
    public AtomicInteger(int initialValue) {
        value = initialValue;
    }

    //....

   public final int get() {
        return value;  //return操作是一个原子操作,具有原子性
    }

    public final void set(int newValue) {
        value = newValue; //赋值操作是一个原子操作,具有原子性
    }

    /**
    *  更新对应的数据,返回更新前的数据
    *  AtomicInteger atomicInteger = new AtomicInteger(2);
    *  atomicInteger.incrementAndGet(i -> i * 3);
    */
    public final int getAndUpdate(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            //从内存中读取修改前的值prev,并执行给定函数式计算修改后的值next;
            prev = get();
            next = updateFunction.applyAsInt(prev);
          
          //调用compareAndSet修改value值(内部是调用了unsafe的compareAndSwapInt方法)。
          //如果此时有其他线程也在修改这个value值,那么CAS操作就会失败,继续进入do循环重新获取新值,再次执行CAS直到修改成功。
        } while (!compareAndSet(prev, next));  
        return prev;
    }
}

Unsafe

  Unsafe是实现CAS的核心类,Java无法直接访问底层操作系统,而是通过本地(native)方法来访问。Unsafe类提供了硬件级别的原子操作(java里面没有指针,但java的sun.misc.Unsafe这个类是一个特殊,使得可以使用指针偏移操作,拥有了类似C语言指针一样操作内存空间的能力)。

public final class Unsafe {
  
  //获取对象o中给定偏移地址(offset)的值。以下相关get方法作用相同
  public native int getInt(Object o, long offset);
  public native Object getObject(Object o, long offset);
  //在对象o的给定偏移地址存储数值x。以下set方法作用相同
  public native void putInt(Object o, long offset, int x);
  public native void putObject(Object o, long offset, Object x);
  
  //获取给定内存地址的一个本地指针
  public native long getAddress(long address);
  //在给定的内存地址处存放一个本地指针x
  public native void putAddress(long address, long x);

  ///------------------内存操作----------------------
  //在本地内存分配一块指定大小的新内存,内存的内容未初始化;它们通常被当做垃圾回收。
  public native long allocateMemory(long bytes);
 //重新分配给定内存地址的本地内存
  public native long reallocateMemory(long address, long bytes);
  //释放给定地址的内存
  public native void freeMemory(long address);
  //获取给定对象的偏移地址
  public native long staticFieldOffset(Field f);
  public native long objectFieldOffset(Field f);

  //------------------数组操作---------------------------------
  //获取给定数组的第一个元素的偏移地址
  public native int arrayBaseOffset(Class<?> arrayClass);
   //获取给定数组的元素增量地址,也就是说每个元素的占位数
  public native int arrayIndexScale(Class<?> arrayClass);

  ///--------------------锁指令(synchronized)-------------------------------
  //对象加锁
  public native void monitorEnter(Object o);
  //对象解锁
  public native void monitorExit(Object o);
  public native boolean tryMonitorEnter(Object o);
  //解除给定线程的阻塞
  public native void unpark(Object thread);
  //阻塞当前线程
  public native void park(boolean isAbsolute, long time);

  // CAS
  public final native boolean compareAndSwapObject(Object o, long offset,
                                                 Object expected,
                                                 Object x);
   //....
}

   

二、AbstractQueuedSynchronizer

AbstractQueuedSynchronizer是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效地构造出来,如常用的ReentrantLock、CountDownLatch等。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
   
    //....
    static final class Node {
        /** 标识节点当前在共享模式下 */
        static final Node SHARED = new Node();
        /** 标识节点当前在独占模式下 */
        static final Node EXCLUSIVE = null;

        /** 值为1,当前节点由于超时或中断被取消。 */
        static final int CANCELLED =  1;
        /** 后继节点在等待当前节点唤醒 */
        static final int SIGNAL    = -1;
        /** 等待在 condition 上 */
        static final int CONDITION = -2;
        /**
         * 状态需要向后传播,针对共享锁
         */
        static final int PROPAGATE = -3;

        /**
         * 状态,默认 0  上面列出的几个常量状态  0代表没有被占用
         */
        volatile int waitStatus;

        /**
         * 前驱节点
         */
        volatile Node prev;

        /**
         * 后继节点
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;
        //...
    }

    /**
    *  队列维护的state值,
    */
    private volatile int state;

    /**
    *  队列维护的state值内存地址的偏移量,通过unsafe类修改(CAS)
    */
    private static final long stateOffset;
   
}
      
      

同步队列其内部都是一个双向链表结构,主要的内容包含thread + waitStatus + pre + next

 

 以ReentrantLock为例,进行代码阅读:

1、ReentrantLock下面有三个内部类:Sync同步器,FairSync同步器, NonfairSync同步器。
2、AQS(AbstractQueuedSynchronizer)继承AOS(AbstractOwnableSynchronizer)
2、Sync继承AQS(AbstractQueuedSynchronizer:设置和获取独占锁的拥有者线程)
3、NonfairSync(非公平锁)、FairSync(公平锁)分别继承Sync

 

 

public class ReentrantLock implements Lock, java.io.Serializable {
    
  
    private final Sync sync;
    
    //ReentrantLock 在内部用了内部类 Sync 来管理锁,所以真正的获取锁和释放锁是由 Sync 的实现类来控制的。
    abstract static class Sync extends AbstractQueuedSynchronizer {
        //....
    }

    /**
     * 非公平锁
     */
    static final class NonfairSync extends Sync {
        //....
    }

    /**
     * 公平锁
     */
    static final class FairSync extends Sync {
        //....
    }
    
    //默认构造函数,生成非公平锁
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * 根据实际情况,构造公平锁和非公平锁
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    
    //....
}

  

2.1 公平锁

首先以公平锁FairSync为例

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
       
        //争锁
        final void lock() {
            acquire(1);
        }
        //代码来自父类AbstractQueuedSynchronizer,
        //如果tryAcquire(arg) 返回true, 也就结束了。
        // 否则,acquireQueued方法会将线程压到队列中
        public final void acquire(int arg) {
          //尝试获取suo
          if (!tryAcquire(arg) &&
            // addWaiter 获取资源/锁失败后,将当前线程加入同步队列的尾部,并标记为独占模式,返回新的同步队列;
            // 使线程在同步队列等待获取资源,一直获取到后才返回,如果在等待过程中被中断过,则返回true,否则返回false。
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //没有获取到锁,放到同步队列后,这个时候需要把当前线程挂起,
            selfInterrupt();
         }


        /**
         * 尝试直接获取锁,返回值是boolean
         * 返回true:1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // state == 0 此时此刻没有线程持有锁
            if (c == 0) {
                //虽然此时此刻锁是可以用的,但是这是公平锁,要求先来先到
                //hasQueuedPredecessors :当前线程是否还有前节点
                //compareAndSetState 通过CAS操作修改AQS类中stateOffset地偏移量对应的state值
                // 设置锁的持有者为当前线程。
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果当前锁状态state不为0,同时当前线程已经持有锁,由于锁是可重入(多次获取)的,则更新重入后的锁状态
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

       //判断当前线程是否还有前节点
       public final boolean hasQueuedPredecessors() {
          // The correctness of this depends on head being initialized
          // before tail and on head.next being accurate if the current
          // thread is first in queue.
          Node t = tail; // Read fields in reverse initialization order
          Node h = head;
          Node s;
          //队列不为空,且头结点的下一个元素不是当前节点
          return h != t &&
              ((s = h.next) == null || s.thread != Thread.currentThread());
      }
      //父类AbstractOwnableSynchronizer(AbstractQueuedSynchronizer继承了AbstractOwnableSynchronizer)中的方法 设置锁的持有者为当前线程。
      protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
      }

      //以尾差法的方式插入到同步队列中
      private Node addWaiter(Node mode) {
          Node node = new Node(Thread.currentThread(), mode);
          // Try the fast path of enq; backup to full enq on failure
          Node pred = tail;
          if (pred != null) {
              node.prev = pred;
              if (compareAndSetTail(pred, node)) {
                  pred.next = node;
                  return node;
              }
          }
          //pred==null(队列是空的) 或者 CAS失败(有线程在竞争入队)
          enq(node);
          return node;
      }
      //(队列是空的) 或者 CAS失败(有线程在竞争入队)
      //采用自旋的方式入队: CAS设置tail过程中,竞争一次竞争不到,我就多次竞争,总会排到的
      private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                //通过CAS操作,对双向链表的head和tail初始化
                if (compareAndSetHead(new Node()))
                    tail = head;
                //此时没有return,初始化链表的头尾节点,继续for循环,走到else分支
            } else {
                //将当前线程排到队尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    //...
  }

在公平锁的lock中,当没有获取到锁,将当前线程加入到同步队列后,会调用acquireQueued方法。使线程在同步队列等待获取资源,一直获取到后才返回,如果在等待过程中被中断过,则返回true,否则返回false。

//将线程放入阻塞队列中
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor(); //获取前驱节点
                //p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,因为它的前驱是head
                //所以当前节点可以去试抢一下锁(首先,它是队头,这个是第一个条件,其次,当前的head有可能是刚刚初始化的node(enq(node)))
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
             // tryAcquire() 方法抛异常的情况
            if (failed)
                cancelAcquire(node);
        }
    }
    
    //当前线程没有抢到锁,是否需要挂起当前线程
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //前驱节点的 waitStatus == -1 ,说明前驱节点状态正常,当前线程需要挂起,直接可以返回true
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        // 前驱节点 waitStatus大于0 ,说明前驱节点取消了排队。
        // 需要知道这点: 进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。
        // 将当前节点的prev指向waitStatus<=0的节点
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2(使用condition条件下),-3(针对共享锁)
            // 在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatu都是0
            // 正常情况下,前驱节点是之前的 tail,那么它的 waitStatus 应该是 0
            // 用CAS将前驱节点的waitStatus设置为Node.SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
       // 这个方法返回 false,那么会再走一次 acquireQueued 中的for循序
       // 此时会从第一个分支返回 true
       return false;
    }

    //将node节点作为头结点
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

    
    //负责挂起线程,通过LockSupport.park(this)来挂起线程,然后就停在这里了,等待被唤醒
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

最后,就是还介绍下唤醒的动作unlock。正常情况下,如果线程没获取到锁,线程会被 LockSupport.park(this); 挂起停止,等待被唤醒。

    public void unlock() {
        sync.release(1);
    }

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h); //唤醒队列中的节点
            return true;
        }
        return false;
    }


    protected final boolean tryRelease(int releases) {
      	int c = getState() - releases;
        //判断当前线程是否持有锁
      	if (Thread.currentThread() != getExclusiveOwnerThread())
        	throw new IllegalMonitorStateException();
        // 是否完全释放锁
      	boolean free = false;
      	if (c == 0) {
        	free = true;
        	setExclusiveOwnerThread(null);
     	 	}
      	setState(c);
      	return free;
    }
     
    //唤醒后继节点,从调用处知道参数node是head头结点
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        //如果head节点当前waitStatus<0, 将其修改为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * 唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1)所以需要从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

总结公平锁的流程:

2.2 非公平锁

非公平锁NonfairSync代码如下,通过对比公平锁和非公平锁tryAcquire的代码可以看到,非公平锁的获取略去了!hasQueuedPredecessors()这一操作,也就是说它不会判断当前线程是否还有前节点(prev node)在等待获取锁,而是直接去进行锁获取操作。

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    
    
   public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //主要区别点
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

非公平锁和公平锁的两处不同: 

1. 非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了,若失败则调用tryAcquire;而公平锁则是直接调用tryAcquire方法。

2. 非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态(是否处在队列的头部位置),如果有则不去抢锁,乖乖排到后面。如果非公平锁在两次 CAS操作后都不成功,那么它和公平锁是一样的,都要进入到阻塞队列等待唤醒。

3、在非公平锁下,当同步队列中有节点时,这时候来了新的更多的线程来抢锁,这些新线程还没有加入到同步队列中去,新线程会跟排队中苏醒的线程进行锁争抢,失败的去同步队列中排队。这里的公平与否,针对的是苏醒线程与还未加入同步队列的线程。对于已经在同步队列中阻塞的线程而言,它们内部自身其实是公平的,因为它们是按顺序被唤醒的,

标签:node,JUC,AQS,CAS,public,int,线程,final,Node
来源: https://www.cnblogs.com/helloworldcode/p/16367133.html