并发编程-读写锁ReentranReadWriteLock应用场景及源码解析
作者:互联网
目录
前言
假设你现在需要实现这样一个需求
给你一个Map集合(共享资源),实现如下需求
- 可以允许两个线程同时调用Map的get方法读取数据
- 不允许两个线程同时调用Map的put方法修改数据
- 不允许两个线程一个调用put方法,一个调用get方法,也就是说当一个线程调用put方法时,另一个想调用get方法的线程需要阻塞等put方法完成
你会如何实现?
这里可能会有同学想到用ConcurentHashMap,确实ConcurentHashMap可以满足第一点和第二点,但是它的put方法和get方法是相互隔离的,也就是说满足不了第三点,这种情况下就需要用到读写锁了
读写锁介绍
什么是读写锁
读写锁实际是一种特殊的自旋锁,它把对共享资源的访问者划分成读者和写者,读者只对共享资源进行读访问,写者则需要对共享资源进行写操作。
特性
这种锁相对于自旋锁而言,能提高并发性,因为在多处理器系统中,它允许同时有多个读线程来访问共享资源,最大可能的读线程数为实际的逻辑CPU数。写线程是排他性的,一个读写锁同时只能有一个写线程或多个读线程(与CPU数相关),但不能同时既有读线程又有写线程。
- 读-读能共存(可以用多个线程同时的读)
- 读-写不能共存(读的时候不能有其他线程去修改,或者修改的时候不能有其他线程去读)
- 写-写不能共存(修改的时候不能再有其他线程去修改)
具体实现
读写锁在Java中的具体实现是ReentranReadWriteLock
ReentranReadWriteLock实现了ReadWriteLock接口,里面定义了读锁和写锁
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
}
然后我们来粗略看一下ReentranReadWriteLock的基本结构
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
//内部类,读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
//内部类,写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
//这个sync大家应该很熟悉了吧,继承了AQS
final Sync sync;
//可以指定是否公平锁
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
//内部类ReadLock
public static class ReadLock implements Lock, java.io.Serializable {
//里面也有一个sync
private final Sync sync;
//注意这里的sync是外面的ReentrantReadWriteLock传进来的
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
//加锁的方法
public void lock() {
sync.acquireShared(1);
}
//解锁
public void unlock() {
sync.releaseShared(1);
}
.........
}
粗略看了下,我们可以得到几个信息
- ReentrantReadWriteLock支持指定是否公平锁
- 和ReentrantLock同样间接继承了AQS,通过sync做锁的控制
- 通过构造器传入的同一个sync和两个不同的静态内部类来控制读写锁的分离
现在我们基本知道了ReentranReadWriteLock的内部结构,下一步我们先来看看它的一些基本使用,
实践出真知,我们来看看如何用它来解决上面的共享Map的问题
public class ReadWriteLockTest {
public static void main(String[] args) {
//这里使用我们自定义的map,主要是为了给大家模拟当put比较慢时会出现的情况
TestMap<Integer, Integer> map = new TestMap<>();
for (int i = 0; i < 5; i++) {
int finalI = i;
new Thread(() -> {
try {
map.put(finalI, finalI);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
for (int i = 0; i < 5; i++) {
int finalI = i;
new Thread(() -> {
map.get(finalI);
}).start();
}
}
//定义一个map
static class TestMap<K, T>{
//定义一个map
private Map<K, T> map = new HashMap<>();
//取值的方法
T get(K key) {
T t = map.get(key);
System.out.println("取到值" + t);
return t;
}
//存值的方法
void put(K key, T value) throws InterruptedException {
//模拟map插入的过程,方便演示问题
Thread.sleep(500);
map.put(key, value);
System.out.println("插入值" + key);
}
}
}
运行结果如下:
取到值null
取到值null
取到值null
取到值null
取到值null
插入值0
插入值1
插入值4
插入值2
插入值3
上图代码会出现的问题就是,当我在往map里面存值还没存完的时候,其他线程同时取值时,这时取出来的是空的(因为还没存进去)
那么如果要实现当有人正在存值时,此时取值的人必须先等待所有存值完毕后,再去取,该如何实现?下面我们来改造下TestMap
//定义一个支持读写锁的map
static class TestMap<K, T>{
//定义一个map
private Map<K, T> map = new HashMap<>();
private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
//读锁
private ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
//写锁
private ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
//取值的方法
T get(K key) {
readLock.lock();
T t = map.get(key);
System.out.println("取到值" + t);
readLock.unlock();
return t;
}
//存值的方法
void put(K key, T value) throws InterruptedException {
writeLock.lock();
//模拟map插入的过程,方便演示问题
Thread.sleep(500);
map.put(key, value);
System.out.println("插入值" + key);
writeLock.unlock();
}
}
运行结果如下:
插入值0
插入值1
插入值2
插入值3
插入值4
取到值0
取到值1
取到值2
取到值3
取到值4
可以看到加锁之后的结果是符合我们的预期的,也就是写线程是优先的,这里提醒下大家unlock方法最好放到finally里面执行,防止不必要的死锁,我这里是为了方便
那么ReentranReadWriteLock源码里是怎么做到写线程独享资源, 读线程共享资源的呢?
下面让源码来解答大家的疑问
源码解析
在文章的开头我们已经了解了ReentranReadWriteLock的内部结构,首先我们来看一下读锁的lock方法
readLock.lock()
public void lock() {
//从名字可以发现这个是共享锁
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
//尝试获取共享锁
if (tryAcquireShared(arg) < 0)
//获取锁失败的话会进入这里
doAcquireShared(arg);
}
tryAcquireShared
从名称可以看到这个是共享锁,然后我们来看看tryAcquireShared方法
Thread current = Thread.currentThread();
int c = getState();
//这里把int类型的c分成了上下两部分前16位和后16位
//前16位代表读锁 后16位代表写锁
//这里是把state & 65535 得到后 16 位的值
//如果不等于0说明写锁已经被持有并且不是当前线程的话就返回-1
//如果是当前线程的话说明还有资格继续获取读锁
//这里就体现了锁降级,也就是说获取了写锁的同时也能获取读锁
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//右移16位 得到c的高16位
int r = sharedCount(c);
//这里调用hasQueuedPredecessors方法,这个我们在ReentrantLock里已经讲解过了
//判断队列中是否有优先级更高的线程,hasQueuedPredecessors方法要返回false才会进行下一步
//此时compareAndSetState设置读锁
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//r==0说明此时读锁是没有被占用的
if (r == 0) {
//把当前线程设置成第一个读者
firstReader = current;
//并且设置读锁的拥有次数
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//这里是判断如果读锁此时已经被占用,并且第一个读线程是当前线程,次数加1
firstReaderHoldCount++;
} else {
//只有当读锁已经被占用,并且第一个读者不是当前线程的时候会进入下面
//代表的是最后一个获取读锁的线程的计数器
HoldCounter rh = cachedHoldCounter;
//如果计数器==null 或者 不是当前线程
if (rh == null || rh.tid != getThreadId(current))
//readHolds.get()获取的是当前线程的读取锁的次数
//把当前线程的计数器设置为cachedHoldCounter
cachedHoldCounter = rh = readHolds.get();
//说明计数器已经存在而且是rh是当前线程
else if (rh.count == 0)
//这里如果数量==0的话,把rh设置到readHolds中
//这个地方困惑了我挺久,看其他文章都是些啥覆盖本地..没明白
//我认为这里是因为count==0时,在fullTryAcquireShared方法中已经把readHolds给清除了,所以这里重新设置一次
readHolds.set(rh);
//入锁次数加1
rh.count++;
}
return 1;
}
//这里是死循环获取读锁
return fullTryAcquireShared(current);
给大家描述下这个方法的流程:
- 写锁存在并且不是当前线程,直接返回-1,意味着如果写锁存在并且是当前线程的话,是可以继续获取读锁的,这里体现了锁降级
- 判断是否有优先级获取读锁
- 获取读锁之后,设置firstReader ,firstReaderHoldCount, cachedHoldCounter 等参数, 这三个参数其实是为了记录读锁的获取次数和类似一个缓存的作用,主要是为了性能考虑的
- 获取失败后进入fullTryAcquireShared方法死循环获取读锁
fullTryAcquireShared
然后看一下fullTryAcquireShared方法
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
//判断是否存在写锁
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
//读者是否应该堵塞,也就是说是否有优先级更高的线程
//这里分为公平锁和非公平锁的两种实现
//返回true说明应该堵塞,返回false说明可以有资格去抢锁
//公平锁:判断队列中是否有优先级更高的节点hasQueuedPredecessors方法
//非公平锁:主要是看队列的head节点的后面,是否是写锁的等待线程,作用主要是为了防止写锁饥饿
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
//是否是重入
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
//不是重入的话
if (rh == null) {
//找到最后一个持有读锁的线程
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
//读锁次数超过65535就抛出异常 这里对应16位
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//尝试加锁,和之前的逻辑一样
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
这里主要就是注意下readerShouldBlock的非公平实现的apparentlyFirstQueuedIsExclusive方法,这里的非公平其实也不是完全非公平,这里为了防止写锁饥饿做了处理
doAcquireShared
然后看看获取锁失败放入队列的doAcquireShared方法
private void doAcquireShared(int arg) {
//这里和reentrantLock的区别就是加了个shared
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//这里判断是头结点的话再次去获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
//获取锁成功后,设置head并且唤醒下一个share结点
//这里也是共享锁和独占锁的区别
//独占锁这里是不会唤醒下一个节点的
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
//唤醒队列后的直到写锁节点前的所有节点
doReleaseShared();
}
}
可以发现这个方法和reenTrantLock的acquireQueued方法差不多
差别主要是两点:
- 指定了node的shared状态
- 获取锁成功后会唤醒队列后的多个share节点
unLock
加锁的方法看完了,接下来看看解锁unLock的方法
public final boolean releaseShared(int arg) {
//对共享变量进行操作--
if (tryReleaseShared(arg)) {
//成功后唤醒下一个share线程
doReleaseShared();
return true;
}
return false;
}
//对共享变量进行操作--,这里不具体展开了
protected final boolean tryReleaseShared(int unused) {
//......
}
//这里是唤醒队列后的多个节点
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//①唤醒下一个节点
//如果下一个节点是读节点:抢到锁后会执行setHeadAndPropagate(node, r)方法,把自身设置成头结点,并且继续唤醒下一个读节点
//如果下一个节点是写节点:
//这里有两个情况:
//1.1,此时是setHeadAndPropagate中调用的doReleaseShared,这是读节点未解锁,写节点依然会堵塞
//1.2,此时是releaseShared解锁方法调用的doReleaseShared,读锁未被占用,写节点抢锁成功
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//什么时候会有h==head的情况呢?
//对应上面的1.1:此时写节点依然堵塞,所有head节点没有更新,此时退出循环
//对应上面的1.2:此时写节点设置head成功,所以h!=head,此时会继续循环唤醒队列中的所有的读节点
//第三种情况:队列遍历完成,此时h==head
if (h == head) // loop if head changed
break;
}
}
注意这里的unLock会唤醒队列中的所有的读节点!!!
因为这里是循环遍历,退出条件是h == head
到这里为止,读锁的加解锁的过程就分析完成了,至于写锁的过程和ReenTrantock的大同小异,这里就不再赘述了
总结
加锁流程
1,判断是否有写锁存在,如果有写锁并且写锁的持有线程不是当前线程,直接加锁失败
2,判断此时线程是否是优先级最高的线程,是的话就设置state的高位(前16位存放读锁)
3,设置锁成功后设置各种参数例如firstReader,firstReaderHoldCount,cachedHoldCounter等参数
4,如果设置锁不成功的话就把当前线程加入到CLH队列中,再次尝试抢锁,失败的话就挂起自己,等待被唤醒
5,被唤醒后,继续尝试抢锁,抢锁成功后,把自己设置为head节点,如果下一个节点也是share节点的话,会一直沿着队列唤醒直到队列的下一个写节点为止
解锁流程
1,更改共享变量
2,唤醒队列中的所有的读节点
结尾
有不当之处欢迎指出,一起学习,共勉
标签:head,读写,读锁,ReentranReadWriteLock,源码,线程,null,rh,节点 来源: https://blog.csdn.net/weixin_39827884/article/details/121104420