【并发编程】读写锁ReentrantReadWriteLock从入门到源码精通
作者:互联网
什么是读写锁?
- 在没有写操作的时候,多个线程同时读一个资源没有任何问题,允许多个线程同时读取共享资源(读读可以并发)。
- 如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写操作了(读写,写读,写写互斥)。
- 在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。
- 读写锁在内部维护了一对相关的锁,一个用于只读操作,称为读锁;一个用于写入操作,称为写锁。
- 线程进入读锁的前提条件:没有其他线程的写锁;没有写请求或者有写请求,但调用线程和持有锁的线程是同一个。
- 线程进入写锁的前提条件:没有其他线程的读锁;没有其他线程的写锁。
读写锁的重要特性
- 公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
- 可重入:读锁和写锁都支持线程重入。以读写线程为例:读线程获取读锁后,能够再次获取读锁。写线程在获取写锁之后能够再次获取写锁,同时也可以获取读锁。
- 锁降级:遵循获取写锁、再获取读锁最后释放写锁的次序,写锁能够降级成为读锁。
ReentrantReadWriteLock的使用
- ReentrantReadWriteLock是可重入的读写锁实现类。
- 在它内部,维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。
- 只要没有 Writer 线程,读锁可以由多个 Reader 线程同时持有。
- 也就是说,写锁是独占的,读锁是共享的。
ReentrantReadWriteLock使用的注意事项
- 读锁不支持条件变量
- 重入时升级不支持:持有读锁的情况下去获取写锁,会导致获取永久等待
- 重入时支持降级: 持有写锁的情况下可以去获取读锁
ReentrantReadWriteLock的应用场景
- 有共享变量并且读多写少的场景!
锁降级
- 锁降级指的是写锁降级成为读锁。主要是为了防止数据没有刷回到主内存,导致其他线程取到的值不一致!
- 没有锁升级:因为大量线程获取读锁,其中一个线程变为写锁改了数据,其他线程不可知,导致其他线程取到的值不一致!
- 如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。
- 锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。
- 锁降级可以帮助我们拿到当前线程修改后的结果而不被其他线程所破坏,防止更新丢失。
ReentrantReadWriteLock锁的使用方式
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
// 处理读读操作用同事进行;读写,写读,写写都会同时进行!
public class ReentrantReadWriteLockTest1 {
static Map<String, Object> map = new HashMap<String, Object>();
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
static Lock r = rwl.readLock();
static Lock w = rwl.writeLock();
// 读操作,用读锁
public final Object read(String key) throws InterruptedException {
r.lock();
try {
System.out.println(System.currentTimeMillis() + "读锁获取成功...");
Thread.sleep(6000);
System.out.println(System.currentTimeMillis() + "读锁执行完成...");
return map.get(key);
} finally {
System.out.println(System.currentTimeMillis() + "读锁释放...");
r.unlock();
}
}
// 写操作,用写锁
public final Object write(String key, Object value) throws InterruptedException {
w.lock();
try {
System.out.println(System.currentTimeMillis() + "写锁获取成功...");
Thread.sleep(6000);
System.out.println(System.currentTimeMillis() + "写锁执行完成...");
return map.put(key, value);
} finally {
System.out.println(System.currentTimeMillis() + "写锁释放...");
w.unlock();
}
}
public static void main(String[] args) {
ReentrantReadWriteLockTest1 lock = new ReentrantReadWriteLockTest1();
new Thread(() -> {
try {
lock.read("1");
// lock.write("1","2");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
lock.read("1");
// lock.write("1","2");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
锁降级的使用方式
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReentrantReadWriteLockTest1 {
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock readLock = rwl.readLock();
private final Lock writeLock = rwl.writeLock();
private volatile boolean update = false;
public void test() {
readLock.lock();
if (!update) {
// 必须先释放读锁
readLock.unlock();
// 锁降级从写锁获取到开始
writeLock.lock();
try {
if (!update) {
// TODO 准备数据的流程(略)
update = true;
}
// =====这行代码就是锁降级的开始代码=====
readLock.lock();
} finally {
writeLock.unlock();
}
// 锁降级完成,写锁降级为读锁
}
try {
// TODO 使用数据的流程(略)
} finally {
// =====这行代码就是锁降级的结束代码=====
readLock.unlock();
}
}
}
ReentrantReadWriteLock源码流程图
ReentrantReadWriteLock读写状态源码分析:一个变量维护多个状态!
// 共享的移位量
static final int SHARED_SHIFT = 16;
// 共享的单位:00000000 00000001 00000000 00000000
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 共享的最大数量:00000000 00000000 11111111 11111111
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 独占的单位:00000000 00000000 11111111 11111111
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
// 计算共享的数量(读锁):高16位表示。读锁可以同时被多个线程持有,每个线程持有的读锁支持重入的特性,所以需要对每个线程持有的读锁的数量单独计数,这就需要用到 HoldCounter 计数器
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
// 计算独占的重入数量(写锁):低16位表示。
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
ReentrantReadWriteLock的读锁计数器HoldCounter源码分析
/**
* 读锁的本质是共享锁,一次共享锁的操作就相当于对HoldCounter 计数器的操作。
* 获取共享锁,则该计数器 + 1,释放共享锁,该计数器 - 1。只有当线程获取共享锁后才能对共享锁进行释放、重入操作。
* HoldCounter是用来记录读锁重入数的对象
*/
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
/**
* 通过 ThreadLocalHoldCounter 类,HoldCounter 与线程进行绑定。
* HoldCounter 是绑定线程的一个计数器,而 ThreadLocalHoldCounter 则是线程绑定的 ThreadLocal。
* ThreadLocalHoldCounter是ThreadLocal变量,用来存放不是第一个获取读锁的线程的其他线程的读锁重入数对象
*/
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
ReentrantReadWriteLock构造方法源码分析
/**
* 无参构造直接调用有参构造,非公平模式!
*/
public ReentrantReadWriteLock() {
this(false);
}
/**
* 传入是否公平模式
*/
public ReentrantReadWriteLock(boolean fair) {
// 区别为公平模式
sync = fair ? new FairSync() : new NonfairSync();
// 初始化读锁
readerLock = new ReadLock(this);
// 初始化写锁
writerLock = new WriteLock(this);
}
ReentrantReadWriteLock写锁加锁源码分析
/**
* 直接调用AQS的获取独占锁逻辑
*/
public void lock() {
sync.acquire(1);
}
/**
* AQS的获取独占锁逻辑
*/
public final void acquire(int arg) {
// tryAcquire源码在下方、acquireQueued在AQS中,与ReentrantLock实现方式一致。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* 尝试获取锁
*/
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
Thread current = Thread.currentThread();
// 获取当前的状态
int c = getState();
// 获取写锁的状态
int w = exclusiveCount(c);
// 总状态不为0,可能有读有些
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 写锁为0,或者当前线程不是持有锁的线程。返回尝试获取锁失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 写锁数量太多,抛异常!
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 执行到这里,是当前的线程,进行重入处理
setState(c + acquires);
// 返回尝试获取锁成功
return true;
}
// 同步队列中有排队的,并且不可以重入的时候,返回尝试获取锁失败
// cas变更实际的状态失败,返回尝试获取锁失败
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 代码执行带这里,表示可以获取到锁
// 设置持有写锁的线程为当前线程
setExclusiveOwnerThread(current);
// 返回尝试获取锁成功
return true;
}
/**
* 公平锁的判断队列中是否有排队的
*/
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
/**
* 非公平锁的判断队列中是否有排队的,直接返回false
*/
final boolean writerShouldBlock() {
return false; // writers can always barge
}
/**
* 判断队列中是否有排队的
*/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 未初始化的时候,队列头部尾部的值为null,相等----不满足
// 只有一个的时候,头尾相等----不满足
// 头部的下一个为null,说明只有一个----满足
// 下一个的线程是当前线程,重入了----满足
// 简单说:链表中只有一个或者链表是空的,返回false。链表中有多个,不满足重入的机制,返回false。只有链表中有多个数据并且持有线程不是当前线程的时候才会返回true!
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
ReentrantReadWriteLock写锁释放源码分析
/**
* 写锁的释放直接调用AQS的释放逻辑。
*/
public void unlock() {
sync.release(1);
}
/**
* 释放锁逻辑:读锁写锁这个方法调用的是同一个
*/
public final boolean release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
// 释放锁成功,获取头结点
Node h = head;
// 头结点不为null并且当前节点的状态不在初始化状态
if (h != null && h.waitStatus != 0)
// unpark去唤醒队列中的下一个线程
unparkSuccessor(h);
// 返回解锁成功
return true;
}
// 返回解锁失败
return false;
}
/**
* ReentrantReadWriteLock的写锁尝试释放锁逻辑
*/
protected final boolean tryRelease(int releases) {
// 不是当前线程,直接抛出异常!
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 得到解锁够的状态值
int nextc = getState() - releases;
// 得到低位是否可以变为0
boolean free = exclusiveCount(nextc) == 0;
// 如果可以变为0(没有变为0是重入了),解除当前线程的绑定
if (free)
setExclusiveOwnerThread(null);
// 将新的状态设置进去
setState(nextc);
// 返回是否解锁成功
return free;
}
ReentrantReadWriteLock读锁加锁源码分析
/**
* 直接调用AQS获取共享锁的逻辑
*/
public void lock() {
sync.acquireShared(1);
}
/**
* 获取共享锁
*/
public final void acquireShared(int arg) {
// 尝试获取锁
if (tryAcquireShared(arg) < 0)
// 这个循环获取共享锁
doAcquireShared(arg);
}
/**
* 尝试去获取锁
*/
protected final int tryAcquireShared(int unused) {
// 获取当前的线程
Thread current = Thread.currentThread();
// 获取当前锁对象的状态
int c = getState();
// 独占锁不是0并且持有写锁的线程不是当前线程!返回负数(没有获取到锁)
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 得到当前锁的读状态
int r = sharedCount(c);
// 队列中没有排队的
// 并且读的数量没有大于最大数量
// 并且可以cas成功将当前锁变更成功(获取到)
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 读锁等于0,第一个线程
if (r == 0) {
// 设置当前持有锁的线程为当前线程
firstReader = current;
// 设置持有的共享数量为1
firstReaderHoldCount = 1;
// 如果第一个持有锁的线程是当前线程,重入次数加1
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 不是第一个线程的时候会执行这里的逻辑
// 得到当前线程的计数器
HoldCounter rh = cachedHoldCounter;
// 当前线程的计数器为空或者当前计数器不是属于当前线程:获取这个计数器
if (rh == null || rh.tid != getThreadId(current))
// 给当前线程创建一个计数器对象
cachedHoldCounter = rh = readHolds.get();
// 当前计数器的数量为0,没有读锁
else if (rh.count == 0)
// 给计数器设置值
readHolds.set(rh);
// 计数器数量加1
rh.count++;
}
// 返回尝试成功
return 1;
}
// 完全尝试获取共享锁
return fullTryAcquireShared(current);
}
/**
* 完全尝试获取共享锁
*/
final int fullTryAcquireShared(Thread current) {
// 定义计数器
HoldCounter rh = null;
for (;;) {
// 获取当前锁的状态
int c = getState();
// 独占锁状态不是0,说明存在独占锁
if (exclusiveCount(c) != 0) {
// 持有写锁的线程不是当前线程,返回负数(没有获取到锁)
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
// 队列中有排队的
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly:确保我们没有以可重入方式获取读锁
// 是当前线程,跳过
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
// 计数器是空的
if (rh == null) {
// 给计数器赋一个值
rh = cachedHoldCounter;
// 当前线程的计数器为空或者当前计数器不是属于当前线程:获取这个计数器
if (rh == null || rh.tid != getThreadId(current)) {
// 赋值新的计数器
rh = readHolds.get();
// 计数器的数量为0,没有读锁,干掉当前计数器
if (rh.count == 0)
readHolds.remove();
}
}
// 计数器数量为0,没有读锁,返回获取锁失败
if (rh.count == 0)
return -1;
}
}
// 共享次数等于最大值,抛出异常
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// cas成功将当前锁变更成功(获取到锁)
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 共享锁(读锁)数量等于0
if (sharedCount(c) == 0) {
// 设置当前持有锁的线程为当前线程
firstReader = current;
// 设置持有的共享数量为1
firstReaderHoldCount = 1;
// 如果第一个持有锁的线程是当前线程,重入次数加1
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 当前线程的计数器为空:给这个计数器复制
if (rh == null)
rh = cachedHoldCounter;
// 当前线程的计数器为空或者当前计数器不是属于当前线程:获取这个计数器
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
// 当前计数器的数量为0,没有读锁
else if (rh.count == 0)
// 给计数器设置值
readHolds.set(rh);
// 计数器数量加1
rh.count++;
// 给当前线程设置计数器
cachedHoldCounter = rh; // cache for release
}
// 返回获取锁成功
return 1;
}
}
}
/**
* 非公平锁的查看队列中是否有排队的
*/
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
/**
* 公平锁的查看队列中是否有排队的
*/
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
/**
* 这个方法为了让后续的方法有获取锁的可能性,防止一直等待
*/
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
// 返回同步队列头结点不是空的
// 并且头结点的下一个节点不是空的
// 并且头节点的下一个节点不是共析模式
// 并且头节点的下一个节点的线程不是空的
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
/**
* 循环去获取共享锁
*/
private void doAcquireShared(int arg) {
// 添加到同步队列,返回当前节点。addWaiter在AQS中,与ReentrantLock的实现方式一致
final Node node = addWaiter(Node.SHARED);
// 定义失败标志位true
boolean failed = true;
try {
// 定义中断标记为false
boolean interrupted = false;
for (;;) {
// 得到当前节点的前驱节点
final Node p = node.predecessor();
// 如果前置节点是头结点
if (p == head) {
// 尝试去获取锁
int r = tryAcquireShared(arg);
// 获取锁成功
if (r >= 0) {
// 设置头结点和链表。setHeadAndPropagate在AQS中,与CountDownLatch的实现方式一致
setHeadAndPropagate(node, r);
// 取消指向,方便GC去回收
p.next = null; // help GC
// 中断状态变了,设置当前线程被中断
if (interrupted)
selfInterrupt();
// 更改失败标志位false
failed = false;
// 结束循环
return;
}
}
// houldParkAfterFailedAcquire和parkAndCheckInterrupt在AQS中,与CountDownLatch的实现方式一致
// 代码执行到这里,说明尝试获取锁,但是获取锁失败了。
// 阻塞前的准备工作操作成功(状态是-1的时候成功)
// 将线程阻塞,等待他去唤醒。唤醒后返回线程的中断状态!
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 上面代码抛出异常的时候,会执行这里的逻辑
if (failed)
// 取消获取锁的逻辑。cancelAcquire在AQS中,与CountDownLatch的实现方式一致
cancelAcquire(node);
}
}
ReentrantReadWriteLock读锁解锁源码分析
/**
* 直接调用AQS接触共享锁的逻辑
*/
public void unlock() {
sync.releaseShared(1);
}
/**
* 释放共享锁的逻辑
*/
public final boolean releaseShared(int arg) {
// 尝试去释放共享锁
if (tryReleaseShared(arg)) {
// 唤醒后继的节点并且保证继续传播。doReleaseShared在AQS中,与CountDownLatch的实现方式一致
doReleaseShared();
// 整体返回true,表示释放共享锁成功
return true;
}
// java规范的写法:必须有个返回值,不会执行到这里!
return false;
}
/**
* 尝试去释放共享锁
*/
protected final boolean tryReleaseShared(int unused) {
// 获取当前的线程
Thread current = Thread.currentThread();
// 第一个线程是当前线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
// 重入次数减一,只有一个的时候,直接变为null
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
// 不是第一个的线程,获取他的计数器
HoldCounter rh = cachedHoldCounter;
// 当前线程的计数器为空或者当前计数器不是属于当前线程:获取这个计数器
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
// 得到计数器的数量
int count = rh.count;
// 一个一下直接移除
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
// 计数器减一
--rh.count;
}
for (;;) {
// 得到当前的线程状态
int c = getState();
// 计算应该变为的下一个值(减去高16位的1)
int nextc = c - SHARED_UNIT;
// cas释放锁
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
// 释放读锁对读锁没有影响,但如果读锁和写锁现在都可用,则可能允许等待的写入程序继续。
return nextc == 0;
}
}
结束语
- 获取更多本文的前置知识文章,以及新的有价值的文章,让我们一起成为架构师!
- 关注公众号,可以让你对MySQL有非常深入的了解
- 关注公众号,每天持续高效的了解并发编程!
- 关注公众号,后续持续高效的了解spring源码!
- 这个公众号,无广告!!!每日更新!!!
标签:ReentrantReadWriteLock,写锁,读写,计数器,获取,读锁,源码,线程,rh 来源: https://www.cnblogs.com/zfcq/p/15863460.html