编程语言
首页 > 编程语言> > 【并发编程】并发包中工具类的基础:AQS

【并发编程】并发包中工具类的基础:AQS

作者:互联网

AQS为什么要学?

java层面如何实现管程?

AQS是什么?

AQS.png

AQS的同步等待队列:

AQS的条件等待队列:

AQS必须具备的功能

AQS的内部属性:volatile int state

    /**
     * The synchronization state.
     */
    private volatile int state;

    /**
     * Returns the current value of synchronization state.
     * This operation has memory semantics of a {@code volatile} read.
     * @return current state value
     */
    protected final int getState() {
        return state;
    }

    /**
     * Sets the value of synchronization state.
     * This operation has memory semantics of a {@code volatile} write.
     * @param newState the new state value
     */
    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a {@code volatile} read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    // 核心方法,通过cas去改变State的值!
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

AQS的资源共享方式

/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

AQS的节点状态

static final int CANCELLED =  1;
static final int SIGNAL    = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

自定义同步器实现时重点关注什么?

    // 该线程是否正在独占资源。只有用到condition才需要去实现它。
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

    // 独占方式。尝试获取资源,成功则返回true,失败则返回false。
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    // 独占方式。尝试释放资源,成功则返回true,失败则返回false。
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    // 共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

    // 共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

我的自定义锁

public class ZhangWeiLock extends AbstractQueuedSynchronizer {

	/**
	 * 尝试获取锁
	 */
	@Override
	protected boolean tryAcquire(int arg) {
		// cas把state=0变为state=1
		if (compareAndSetState(0, 1)) {
			// 设置执行的线程
			setExclusiveOwnerThread(Thread.currentThread());
			// 返回获取锁成功
			return true;
		}

		// 返回获取锁成功
		return false;
	}

	/**
	 * 释放锁
	 */
	@Override
	protected boolean tryRelease(int arg) {
		// 只有获取锁的线程才能释放锁
		setExclusiveOwnerThread(null);
		setState(0);

		// 返回释放锁成功
		return true;
	}

	public void lock() {
		acquire(1);
	}

	public boolean tryLock() {
		return tryAcquire(1);
	}

	public void unlock() {
		release(1);
	}

	public boolean isLocked() {
		return isHeldExclusively();
	}

}

测试方法

public class TsetZhangWeiLock {

	// 定义数据
	private static int sum = 0;

	// 定义自己的锁
	private static ZhangWeiLock lock = new ZhangWeiLock();

	public static void main(String[] args) throws InterruptedException {

		// 循环3吃,开启三个线程
		for (int i = 0; i < 3; i++) {
			Thread thread = new Thread(() -> {
				// 加锁
				lock.lock();
				try {
					// 不加锁这里执行完小于30000
					for (int j = 0; j < 10000; j++) {
						sum++;
					}
				} finally {
					// 解锁
					lock.unlock();
				}
			});
			thread.start();
		}

		// 等待线程执行完
		Thread.sleep(2000);

		// 打印最后的结果
		System.out.println(sum);
	}
}

测试结果

自定义锁的结果.png

JDK中是如何自定义同步器的?

ReentrantLock的锁实现.png

AQS的等待唤醒机制内部类:Condition

在AbstractQueuedSynchronizer类中的源码位置

    public class ConditionObject implements Condition, java.io.Serializable {

Condition接口的方法

Condition接口的方法.jpg

验证等待唤醒:使用ReentrantLock(后续有针对ReentrantLock深入的源码分析)

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TsstWait {

	public static void main(String[] args) {

		Lock lock = new ReentrantLock();
		Condition condition = lock.newCondition();

		new Thread(() -> {
			lock.lock();
			try {
				log.debug(Thread.currentThread().getName() + " 开始处理任务");
				condition.await();
				log.debug(Thread.currentThread().getName() + " 结束处理任务");
			} catch (InterruptedException e) {
				e.printStackTrace();
			} finally {
				lock.unlock();
			}
		}).start();

		new Thread(() -> {
			lock.lock();
			try {
				log.debug(Thread.currentThread().getName() + " 开始处理任务");

				Thread.sleep(2000);
				condition.signal();
				log.debug(Thread.currentThread().getName() + " 结束处理任务");
			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				lock.unlock();
			}
		}).start();
	}
}

等待唤醒结果

等待唤醒结果的验证结果.png

结束语

标签:AQS,int,lock,编程,state,线程,new,发包
来源: https://blog.csdn.net/qq_35436158/article/details/122752958