编程语言
首页 > 编程语言> > 【并发编程】读写锁ReentrantReadWriteLock从入门到源码精通

【并发编程】读写锁ReentrantReadWriteLock从入门到源码精通

作者:互联网

什么是读写锁?

读写锁的重要特性

ReentrantReadWriteLock的使用

ReentrantReadWriteLock的主要方法.png

ReentrantReadWriteLock使用的注意事项

ReentrantReadWriteLock的应用场景

锁降级

ReentrantReadWriteLock锁的使用方式

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

// 处理读读操作用同事进行;读写,写读,写写都会同时进行!
public class ReentrantReadWriteLockTest1 {
	static Map<String, Object> map = new HashMap<String, Object>();
	static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
	static Lock r = rwl.readLock();
	static Lock w = rwl.writeLock();

	// 读操作,用读锁
	public final Object read(String key) throws InterruptedException {
		r.lock();
		try {
			System.out.println(System.currentTimeMillis() + "读锁获取成功...");
			Thread.sleep(6000);
			System.out.println(System.currentTimeMillis() + "读锁执行完成...");
			return map.get(key);
		} finally {
			System.out.println(System.currentTimeMillis() + "读锁释放...");
			r.unlock();
		}
	}

	// 写操作,用写锁
	public final Object write(String key, Object value) throws InterruptedException {
		w.lock();
		try {
			System.out.println(System.currentTimeMillis() + "写锁获取成功...");
			Thread.sleep(6000);
			System.out.println(System.currentTimeMillis() + "写锁执行完成...");
			return map.put(key, value);
		} finally {
			System.out.println(System.currentTimeMillis() + "写锁释放...");
			w.unlock();
		}
	}

	public static void main(String[] args) {
		ReentrantReadWriteLockTest1 lock = new ReentrantReadWriteLockTest1();

		new Thread(() -> {
			try {
				lock.read("1");
				// lock.write("1","2");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}).start();

		new Thread(() -> {
			try {
				lock.read("1");
				// lock.write("1","2");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}).start();
	}
}

锁降级的使用方式

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReentrantReadWriteLockTest1 {

	private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
	private final Lock readLock = rwl.readLock();
	private final Lock writeLock = rwl.writeLock();
	private volatile boolean update = false;

	public void test() {
		readLock.lock();
		if (!update) {
			// 必须先释放读锁
			readLock.unlock();
			// 锁降级从写锁获取到开始
			writeLock.lock();
			try {
				if (!update) {
					// TODO 准备数据的流程(略)
					update = true;
				}
				// =====这行代码就是锁降级的开始代码=====
				readLock.lock();
			} finally {
				writeLock.unlock();
			}
			// 锁降级完成,写锁降级为读锁
		}
		try {
			// TODO 使用数据的流程(略)
		} finally {
			// =====这行代码就是锁降级的结束代码=====
			readLock.unlock();
		}
	}
}

ReentrantReadWriteLock源码流程图

ReentrantReadWriteLock源码流程图.png

ReentrantReadWriteLock读写状态源码分析:一个变量维护多个状态!

// 共享的移位量
static final int SHARED_SHIFT   = 16;
// 共享的单位:00000000 00000001 00000000 00000000
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
// 共享的最大数量:00000000 00000000 11111111 11111111
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
// 独占的单位:00000000 00000000 11111111 11111111
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count  */
// 计算共享的数量(读锁):高16位表示。读锁可以同时被多个线程持有,每个线程持有的读锁支持重入的特性,所以需要对每个线程持有的读锁的数量单独计数,这就需要用到 HoldCounter 计数器
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count  */
// 计算独占的重入数量(写锁):低16位表示。
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

ReentrantReadWriteLock的读锁计数器HoldCounter源码分析

/**
 * 读锁的本质是共享锁,一次共享锁的操作就相当于对HoldCounter 计数器的操作。
 * 获取共享锁,则该计数器 + 1,释放共享锁,该计数器 - 1。只有当线程获取共享锁后才能对共享锁进行释放、重入操作。
 * HoldCounter是用来记录读锁重入数的对象
 */
static final class HoldCounter {
    int count = 0;
    // Use id, not reference, to avoid garbage retention
    final long tid = getThreadId(Thread.currentThread());
}

/**
 * 通过 ThreadLocalHoldCounter 类,HoldCounter 与线程进行绑定。
 * HoldCounter 是绑定线程的一个计数器,而 ThreadLocalHoldCounter 则是线程绑定的 ThreadLocal。
 * ThreadLocalHoldCounter是ThreadLocal变量,用来存放不是第一个获取读锁的线程的其他线程的读锁重入数对象
 */
static final class ThreadLocalHoldCounter
	extends ThreadLocal<HoldCounter> {
	public HoldCounter initialValue() {
	    return new HoldCounter();
	}
}

ReentrantReadWriteLock构造方法源码分析

/**
 * 无参构造直接调用有参构造,非公平模式!
 */
public ReentrantReadWriteLock() {
    this(false);
}

/**
 * 传入是否公平模式
 */
public ReentrantReadWriteLock(boolean fair) {
    // 区别为公平模式
    sync = fair ? new FairSync() : new NonfairSync();
    // 初始化读锁
    readerLock = new ReadLock(this);
    // 初始化写锁
    writerLock = new WriteLock(this);
}

ReentrantReadWriteLock写锁加锁源码分析

写锁加锁获取锁逻辑图.png

/**
 * 直接调用AQS的获取独占锁逻辑
 */
public void lock() {
    sync.acquire(1);
}

/**
 * AQS的获取独占锁逻辑
 */
public final void acquire(int arg) {
    // tryAcquire源码在下方、acquireQueued在AQS中,与ReentrantLock实现方式一致。
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

/**
 * 尝试获取锁
 */
protected final boolean tryAcquire(int acquires) {
    // 获取当前线程
    Thread current = Thread.currentThread();
    // 获取当前的状态
    int c = getState();
    // 获取写锁的状态
    int w = exclusiveCount(c);
    // 总状态不为0,可能有读有些
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        // 写锁为0,或者当前线程不是持有锁的线程。返回尝试获取锁失败
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 写锁数量太多,抛异常!
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        // 执行到这里,是当前的线程,进行重入处理
        setState(c + acquires);
        // 返回尝试获取锁成功
        return true;
    }
    // 同步队列中有排队的,并且不可以重入的时候,返回尝试获取锁失败
    // cas变更实际的状态失败,返回尝试获取锁失败
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    // 代码执行带这里,表示可以获取到锁
    // 设置持有写锁的线程为当前线程
    setExclusiveOwnerThread(current);
    // 返回尝试获取锁成功
    return true;
}

/**
 * 公平锁的判断队列中是否有排队的
 */
final boolean writerShouldBlock() {
    return hasQueuedPredecessors();
}

/**
 * 非公平锁的判断队列中是否有排队的,直接返回false
 */
final boolean writerShouldBlock() {
    return false; // writers can always barge
}

/**
 * 判断队列中是否有排队的
 */
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    // 未初始化的时候,队列头部尾部的值为null,相等----不满足
    // 只有一个的时候,头尾相等----不满足
    // 头部的下一个为null,说明只有一个----满足
    // 下一个的线程是当前线程,重入了----满足
    // 简单说:链表中只有一个或者链表是空的,返回false。链表中有多个,不满足重入的机制,返回false。只有链表中有多个数据并且持有线程不是当前线程的时候才会返回true!
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

ReentrantReadWriteLock写锁释放源码分析

/**
 * 写锁的释放直接调用AQS的释放逻辑。
 */
public void unlock() {
    sync.release(1);
}

/**
 * 释放锁逻辑:读锁写锁这个方法调用的是同一个
 */
public final boolean release(int arg) {
    // 尝试释放锁
    if (tryRelease(arg)) {
        // 释放锁成功,获取头结点
        Node h = head;
        // 头结点不为null并且当前节点的状态不在初始化状态
        if (h != null && h.waitStatus != 0)
            // unpark去唤醒队列中的下一个线程
            unparkSuccessor(h);
        // 返回解锁成功
        return true;
    }
    // 返回解锁失败
    return false;
}

/**
 * ReentrantReadWriteLock的写锁尝试释放锁逻辑
 */
protected final boolean tryRelease(int releases) {
    // 不是当前线程,直接抛出异常!
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 得到解锁够的状态值
    int nextc = getState() - releases;
    // 得到低位是否可以变为0
    boolean free = exclusiveCount(nextc) == 0;
    // 如果可以变为0(没有变为0是重入了),解除当前线程的绑定
    if (free)
        setExclusiveOwnerThread(null);
    // 将新的状态设置进去
    setState(nextc);
    // 返回是否解锁成功
    return free;
}

ReentrantReadWriteLock读锁加锁源码分析

/**
 * 直接调用AQS获取共享锁的逻辑
 */
public void lock() {
    sync.acquireShared(1);
}

/**
 * 获取共享锁
 */
public final void acquireShared(int arg) {
    // 尝试获取锁
    if (tryAcquireShared(arg) < 0)
        // 这个循环获取共享锁
        doAcquireShared(arg);
}

/**
 * 尝试去获取锁
 */
protected final int tryAcquireShared(int unused) {
    // 获取当前的线程
    Thread current = Thread.currentThread();
    // 获取当前锁对象的状态
    int c = getState();
    // 独占锁不是0并且持有写锁的线程不是当前线程!返回负数(没有获取到锁)
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // 得到当前锁的读状态
    int r = sharedCount(c);
    // 队列中没有排队的
    // 并且读的数量没有大于最大数量
    // 并且可以cas成功将当前锁变更成功(获取到)
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 读锁等于0,第一个线程
        if (r == 0) {
            // 设置当前持有锁的线程为当前线程
            firstReader = current;
            // 设置持有的共享数量为1
            firstReaderHoldCount = 1;
        // 如果第一个持有锁的线程是当前线程,重入次数加1
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            // 不是第一个线程的时候会执行这里的逻辑
            // 得到当前线程的计数器
            HoldCounter rh = cachedHoldCounter;
            // 当前线程的计数器为空或者当前计数器不是属于当前线程:获取这个计数器
            if (rh == null || rh.tid != getThreadId(current))
                // 给当前线程创建一个计数器对象
                cachedHoldCounter = rh = readHolds.get();
            // 当前计数器的数量为0,没有读锁
            else if (rh.count == 0)
                // 给计数器设置值
                readHolds.set(rh);
            // 计数器数量加1
            rh.count++;
        }
        // 返回尝试成功
        return 1;
    }
    // 完全尝试获取共享锁
    return fullTryAcquireShared(current);
}

/**
 * 完全尝试获取共享锁
 */
final int fullTryAcquireShared(Thread current) {
    // 定义计数器
    HoldCounter rh = null;
    for (;;) {
        // 获取当前锁的状态
        int c = getState();
        // 独占锁状态不是0,说明存在独占锁
        if (exclusiveCount(c) != 0) {
            // 持有写锁的线程不是当前线程,返回负数(没有获取到锁)
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        // 队列中有排队的
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly:确保我们没有以可重入方式获取读锁
            // 是当前线程,跳过
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                // 计数器是空的
                if (rh == null) {
                    // 给计数器赋一个值
                    rh = cachedHoldCounter;
                    // 当前线程的计数器为空或者当前计数器不是属于当前线程:获取这个计数器
                    if (rh == null || rh.tid != getThreadId(current)) {
                        // 赋值新的计数器
                        rh = readHolds.get();
                        // 计数器的数量为0,没有读锁,干掉当前计数器
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                // 计数器数量为0,没有读锁,返回获取锁失败
                if (rh.count == 0)
                    return -1;
            }
        }
        // 共享次数等于最大值,抛出异常
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // cas成功将当前锁变更成功(获取到锁)
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // 共享锁(读锁)数量等于0
            if (sharedCount(c) == 0) {
                // 设置当前持有锁的线程为当前线程
                firstReader = current;
                // 设置持有的共享数量为1
                firstReaderHoldCount = 1;
            // 如果第一个持有锁的线程是当前线程,重入次数加1
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                // 当前线程的计数器为空:给这个计数器复制
                if (rh == null)
                    rh = cachedHoldCounter;
                // 当前线程的计数器为空或者当前计数器不是属于当前线程:获取这个计数器
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                // 当前计数器的数量为0,没有读锁
                else if (rh.count == 0)
                    // 给计数器设置值
                    readHolds.set(rh);
                // 计数器数量加1
                rh.count++;
                // 给当前线程设置计数器
                cachedHoldCounter = rh; // cache for release
            }
            // 返回获取锁成功
            return 1;
        }
    }
}

/**
 * 非公平锁的查看队列中是否有排队的
 */
final boolean readerShouldBlock() {
    return apparentlyFirstQueuedIsExclusive();
}

/**
 * 公平锁的查看队列中是否有排队的
 */
final boolean readerShouldBlock() {
    return hasQueuedPredecessors();
}

/**
 * 这个方法为了让后续的方法有获取锁的可能性,防止一直等待
 */
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    // 返回同步队列头结点不是空的
    // 并且头结点的下一个节点不是空的
    // 并且头节点的下一个节点不是共析模式
    // 并且头节点的下一个节点的线程不是空的
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}

/**
 * 循环去获取共享锁
 */
private void doAcquireShared(int arg) {
    // 添加到同步队列,返回当前节点。addWaiter在AQS中,与ReentrantLock的实现方式一致
    final Node node = addWaiter(Node.SHARED);
    // 定义失败标志位true
    boolean failed = true;
    try {
        // 定义中断标记为false
        boolean interrupted = false;
        for (;;) {
            // 得到当前节点的前驱节点
            final Node p = node.predecessor();
            // 如果前置节点是头结点
            if (p == head) {
                // 尝试去获取锁
                int r = tryAcquireShared(arg);
                // 获取锁成功
                if (r >= 0) {
                    // 设置头结点和链表。setHeadAndPropagate在AQS中,与CountDownLatch的实现方式一致
                    setHeadAndPropagate(node, r);
                    // 取消指向,方便GC去回收
                    p.next = null; // help GC
                    // 中断状态变了,设置当前线程被中断
                    if (interrupted)
                        selfInterrupt();
                    // 更改失败标志位false
                    failed = false;
                    // 结束循环
                    return;
                }
            }
            // houldParkAfterFailedAcquire和parkAndCheckInterrupt在AQS中,与CountDownLatch的实现方式一致
            // 代码执行到这里,说明尝试获取锁,但是获取锁失败了。
            // 阻塞前的准备工作操作成功(状态是-1的时候成功)
            // 将线程阻塞,等待他去唤醒。唤醒后返回线程的中断状态!
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 上面代码抛出异常的时候,会执行这里的逻辑
        if (failed)
            // 取消获取锁的逻辑。cancelAcquire在AQS中,与CountDownLatch的实现方式一致
            cancelAcquire(node);
    }
}

ReentrantReadWriteLock读锁解锁源码分析

/**
 * 直接调用AQS接触共享锁的逻辑
 */
public void unlock() {
    sync.releaseShared(1);
}

/**
 * 释放共享锁的逻辑
 */
public final boolean releaseShared(int arg) {
    // 尝试去释放共享锁
    if (tryReleaseShared(arg)) {
        // 唤醒后继的节点并且保证继续传播。doReleaseShared在AQS中,与CountDownLatch的实现方式一致
        doReleaseShared();
        // 整体返回true,表示释放共享锁成功
        return true;
    }
    // java规范的写法:必须有个返回值,不会执行到这里!
    return false;
}

/**
 * 尝试去释放共享锁
 */
protected final boolean tryReleaseShared(int unused) {
    // 获取当前的线程
    Thread current = Thread.currentThread();
    // 第一个线程是当前线程
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        // 重入次数减一,只有一个的时候,直接变为null
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        // 不是第一个的线程,获取他的计数器
        HoldCounter rh = cachedHoldCounter;
        // 当前线程的计数器为空或者当前计数器不是属于当前线程:获取这个计数器
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        // 得到计数器的数量
        int count = rh.count;
        // 一个一下直接移除
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        // 计数器减一
        --rh.count;
    }
    for (;;) {
        // 得到当前的线程状态
        int c = getState();
        // 计算应该变为的下一个值(减去高16位的1)
        int nextc = c - SHARED_UNIT;
        // cas释放锁
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            // 释放读锁对读锁没有影响,但如果读锁和写锁现在都可用,则可能允许等待的写入程序继续。
            return nextc == 0;
    }
}

结束语

标签:ReentrantReadWriteLock,写锁,读写,计数器,获取,读锁,源码,线程,rh
来源: https://www.cnblogs.com/zfcq/p/15863460.html