编程语言
首页 > 编程语言> > 并发编程-读写锁ReentranReadWriteLock应用场景及源码解析

并发编程-读写锁ReentranReadWriteLock应用场景及源码解析

作者:互联网

目录

前言

假设你现在需要实现这样一个需求

给你一个Map集合(共享资源),实现如下需求

  • 可以允许两个线程同时调用Map的get方法读取数据
  • 不允许两个线程同时调用Map的put方法修改数据
  • 不允许两个线程一个调用put方法,一个调用get方法,也就是说当一个线程调用put方法时,另一个想调用get方法的线程需要阻塞等put方法完成

你会如何实现?

这里可能会有同学想到用ConcurentHashMap,确实ConcurentHashMap可以满足第一点和第二点,但是它的put方法和get方法是相互隔离的,也就是说满足不了第三点,这种情况下就需要用到读写锁了

读写锁介绍

什么是读写锁

读写锁实际是一种特殊的自旋锁,它把对共享资源的访问者划分成读者和写者,读者只对共享资源进行读访问,写者则需要对共享资源进行写操作。

特性

这种锁相对于自旋锁而言,能提高并发性,因为在多处理器系统中,它允许同时有多个读线程来访问共享资源,最大可能的读线程数为实际的逻辑CPU数。写线程是排他性的,一个读写锁同时只能有一个写线程或多个读线程(与CPU数相关),但不能同时既有读线程又有写线程。

  • 读-读能共存(可以用多个线程同时的读)
  • 读-写不能共存(读的时候不能有其他线程去修改,或者修改的时候不能有其他线程去读)
  • 写-写不能共存(修改的时候不能再有其他线程去修改)

具体实现

读写锁在Java中的具体实现是ReentranReadWriteLock

ReentranReadWriteLock实现了ReadWriteLock接口,里面定义了读锁和写锁

public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     *
     * @return the lock used for reading
     */
    Lock readLock();

    /**
     * Returns the lock used for writing.
     *
     * @return the lock used for writing
     */
    Lock writeLock();
}

然后我们来粗略看一下ReentranReadWriteLock的基本结构

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    private static final long serialVersionUID = -6992448646407690164L;
    //内部类,读锁
    private final ReentrantReadWriteLock.ReadLock readerLock;
    //内部类,写锁
    private final ReentrantReadWriteLock.WriteLock writerLock;
    //这个sync大家应该很熟悉了吧,继承了AQS
    final Sync sync;
	
	//可以指定是否公平锁
	public ReentrantReadWriteLock(boolean fair) {
       sync = fair ? new FairSync() : new NonfairSync();
       readerLock = new ReadLock(this);
       writerLock = new WriteLock(this);
    }
	
	//内部类ReadLock
	public static class ReadLock implements Lock, java.io.Serializable {
	
	   //里面也有一个sync
       private final Sync sync;

       //注意这里的sync是外面的ReentrantReadWriteLock传进来的
       protected ReadLock(ReentrantReadWriteLock lock) {
           sync = lock.sync;
       }

       //加锁的方法
       public void lock() {
           sync.acquireShared(1);
       }
	   //解锁
	   public void unlock() {
           sync.releaseShared(1);
       }
	   
	   .........
}

粗略看了下,我们可以得到几个信息

  • ReentrantReadWriteLock支持指定是否公平锁
  • 和ReentrantLock同样间接继承了AQS,通过sync做锁的控制
  • 通过构造器传入的同一个sync和两个不同的静态内部类来控制读写锁的分离

现在我们基本知道了ReentranReadWriteLock的内部结构,下一步我们先来看看它的一些基本使用,
实践出真知,我们来看看如何用它来解决上面的共享Map的问题

public class ReadWriteLockTest {

    public static void main(String[] args) {
        //这里使用我们自定义的map,主要是为了给大家模拟当put比较慢时会出现的情况
        TestMap<Integer, Integer> map = new TestMap<>();
        for (int i = 0; i < 5; i++) {
            int finalI = i;
            new Thread(() -> {
                try {
                    map.put(finalI, finalI);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();

        }
        for (int i = 0; i < 5; i++) {
            int finalI = i;
            new Thread(() -> {
                map.get(finalI);
            }).start();

        }
    }
    //定义一个map
    static class TestMap<K, T>{
        //定义一个map
        private Map<K, T> map = new HashMap<>();
        //取值的方法
        T get(K key) {
            T t = map.get(key);
            System.out.println("取到值" + t);
            return t;
        }
        //存值的方法
        void put(K key, T value) throws InterruptedException {
            //模拟map插入的过程,方便演示问题
            Thread.sleep(500);
            map.put(key, value);
            System.out.println("插入值" + key);
        }
    }
}
运行结果如下:
取到值null
取到值null
取到值null
取到值null
取到值null
插入值0
插入值1
插入值4
插入值2
插入值3

上图代码会出现的问题就是,当我在往map里面存值还没存完的时候,其他线程同时取值时,这时取出来的是空的(因为还没存进去)

那么如果要实现当有人正在存值时,此时取值的人必须先等待所有存值完毕后,再去取,该如何实现?下面我们来改造下TestMap

 //定义一个支持读写锁的map
    static class TestMap<K, T>{
        //定义一个map
        private Map<K, T> map = new HashMap<>();
        private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
        //读锁
        private ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
        //写锁
        private ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
        //取值的方法
        T get(K key) {
            readLock.lock();
            T t = map.get(key);
            System.out.println("取到值" + t);
            readLock.unlock();
            return t;
        }
        //存值的方法
        void put(K key, T value) throws InterruptedException {
            writeLock.lock();
            //模拟map插入的过程,方便演示问题
            Thread.sleep(500);
            map.put(key, value);
            System.out.println("插入值" + key);
            writeLock.unlock();
        }
    }
运行结果如下:
插入值0
插入值1
插入值2
插入值3
插入值4
取到值0
取到值1
取到值2
取到值3
取到值4

可以看到加锁之后的结果是符合我们的预期的,也就是写线程是优先的,这里提醒下大家unlock方法最好放到finally里面执行,防止不必要的死锁,我这里是为了方便

那么ReentranReadWriteLock源码里是怎么做到写线程独享资源, 读线程共享资源的呢?
下面让源码来解答大家的疑问

源码解析

在文章的开头我们已经了解了ReentranReadWriteLock的内部结构,首先我们来看一下读锁的lock方法

readLock.lock()

	public void lock() {
        //从名字可以发现这个是共享锁
         sync.acquireShared(1);
    }
     public final void acquireShared(int arg) {
        //尝试获取共享锁
        if (tryAcquireShared(arg) < 0)
            //获取锁失败的话会进入这里
            doAcquireShared(arg);
    }

tryAcquireShared

从名称可以看到这个是共享锁,然后我们来看看tryAcquireShared方法

 Thread current = Thread.currentThread();
            int c = getState();
            //这里把int类型的c分成了上下两部分前16位和后16位
            //前16位代表读锁 后16位代表写锁
            //这里是把state & 65535 得到后 16 位的值
            //如果不等于0说明写锁已经被持有并且不是当前线程的话就返回-1
            //如果是当前线程的话说明还有资格继续获取读锁
            //这里就体现了锁降级,也就是说获取了写锁的同时也能获取读锁
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            //右移16位 得到c的高16位
            int r = sharedCount(c);
            //这里调用hasQueuedPredecessors方法,这个我们在ReentrantLock里已经讲解过了
            //判断队列中是否有优先级更高的线程,hasQueuedPredecessors方法要返回false才会进行下一步
            //此时compareAndSetState设置读锁
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                //r==0说明此时读锁是没有被占用的
                if (r == 0) {
                    //把当前线程设置成第一个读者
                    firstReader = current;
                    //并且设置读锁的拥有次数
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    //这里是判断如果读锁此时已经被占用,并且第一个读线程是当前线程,次数加1
                    firstReaderHoldCount++;
                } else {
                    //只有当读锁已经被占用,并且第一个读者不是当前线程的时候会进入下面

                    //代表的是最后一个获取读锁的线程的计数器
                    HoldCounter rh = cachedHoldCounter;
                    //如果计数器==null 或者 不是当前线程
                    if (rh == null || rh.tid != getThreadId(current))
                        //readHolds.get()获取的是当前线程的读取锁的次数
                        //把当前线程的计数器设置为cachedHoldCounter
                        cachedHoldCounter = rh = readHolds.get();
                    //说明计数器已经存在而且是rh是当前线程
                    else if (rh.count == 0)
                        //这里如果数量==0的话,把rh设置到readHolds中
                        //这个地方困惑了我挺久,看其他文章都是些啥覆盖本地..没明白
                        //我认为这里是因为count==0时,在fullTryAcquireShared方法中已经把readHolds给清除了,所以这里重新设置一次
                        readHolds.set(rh);
                    //入锁次数加1
                    rh.count++;
                }
                return 1;
            }
            //这里是死循环获取读锁
            return fullTryAcquireShared(current);

给大家描述下这个方法的流程:

  • 写锁存在并且不是当前线程,直接返回-1,意味着如果写锁存在并且是当前线程的话,是可以继续获取读锁的,这里体现了锁降级
  • 判断是否有优先级获取读锁
  • 获取读锁之后,设置firstReader ,firstReaderHoldCount, cachedHoldCounter 等参数, 这三个参数其实是为了记录读锁的获取次数和类似一个缓存的作用,主要是为了性能考虑的
  • 获取失败后进入fullTryAcquireShared方法死循环获取读锁

fullTryAcquireShared

然后看一下fullTryAcquireShared方法

final int fullTryAcquireShared(Thread current) {
            /*
             * This code is in part redundant with that in
             * tryAcquireShared but is simpler overall by not
             * complicating tryAcquireShared with interactions between
             * retries and lazily reading hold counts.
             */
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                //判断是否存在写锁
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                    //读者是否应该堵塞,也就是说是否有优先级更高的线程
                    //这里分为公平锁和非公平锁的两种实现
                    //返回true说明应该堵塞,返回false说明可以有资格去抢锁
                    //公平锁:判断队列中是否有优先级更高的节点hasQueuedPredecessors方法
                    //非公平锁:主要是看队列的head节点的后面,是否是写锁的等待线程,作用主要是为了防止写锁饥饿
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    //是否是重入
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        //不是重入的话
                        if (rh == null) {
                            //找到最后一个持有读锁的线程
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                //读锁次数超过65535就抛出异常 这里对应16位
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                //尝试加锁,和之前的逻辑一样
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }
		final boolean readerShouldBlock() {
            /* As a heuristic to avoid indefinite writer starvation,
             * block if the thread that momentarily appears to be head
             * of queue, if one exists, is a waiting writer.  This is
             * only a probabilistic effect since a new reader will not
             * block if there is a waiting writer behind other enabled
             * readers that have not yet drained from the queue.
             */
            return apparentlyFirstQueuedIsExclusive();
        }
        
        final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }

这里主要就是注意下readerShouldBlock的非公平实现的apparentlyFirstQueuedIsExclusive方法,这里的非公平其实也不是完全非公平,这里为了防止写锁饥饿做了处理

doAcquireShared

然后看看获取锁失败放入队列的doAcquireShared方法

private void doAcquireShared(int arg) {
        //这里和reentrantLock的区别就是加了个shared
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    //这里判断是头结点的话再次去获取锁
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //获取锁成功后,设置head并且唤醒下一个share结点
                        //这里也是共享锁和独占锁的区别
                        //独占锁这里是不会唤醒下一个节点的
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
     private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                //唤醒队列后的直到写锁节点前的所有节点
                doReleaseShared();
        }
    }

可以发现这个方法和reenTrantLock的acquireQueued方法差不多
差别主要是两点:

  • 指定了node的shared状态
  • 获取锁成功后会唤醒队列后的多个share节点

unLock

加锁的方法看完了,接下来看看解锁unLock的方法

public final boolean releaseShared(int arg) {
        //对共享变量进行操作--
        if (tryReleaseShared(arg)) {
            //成功后唤醒下一个share线程
            doReleaseShared();
            return true;
        }
        return false;
    }
    //对共享变量进行操作--,这里不具体展开了
	protected final boolean tryReleaseShared(int unused) {
           //......
        }

	//这里是唤醒队列后的多个节点
	private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //①唤醒下一个节点
                    //如果下一个节点是读节点:抢到锁后会执行setHeadAndPropagate(node, r)方法,把自身设置成头结点,并且继续唤醒下一个读节点
                    //如果下一个节点是写节点:
                       //这里有两个情况:
                       //1.1,此时是setHeadAndPropagate中调用的doReleaseShared,这是读节点未解锁,写节点依然会堵塞
                       //1.2,此时是releaseShared解锁方法调用的doReleaseShared,读锁未被占用,写节点抢锁成功
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            //什么时候会有h==head的情况呢?
            //对应上面的1.1:此时写节点依然堵塞,所有head节点没有更新,此时退出循环
            //对应上面的1.2:此时写节点设置head成功,所以h!=head,此时会继续循环唤醒队列中的所有的读节点
            //第三种情况:队列遍历完成,此时h==head
            if (h == head)                   // loop if head changed
                break;
        }
    } 

注意这里的unLock会唤醒队列中的所有的读节点!!!
因为这里是循环遍历,退出条件是h == head
到这里为止,读锁的加解锁的过程就分析完成了,至于写锁的过程和ReenTrantock的大同小异,这里就不再赘述了

总结

加锁流程

1,判断是否有写锁存在,如果有写锁并且写锁的持有线程不是当前线程,直接加锁失败
2,判断此时线程是否是优先级最高的线程,是的话就设置state的高位(前16位存放读锁)
3,设置锁成功后设置各种参数例如firstReader,firstReaderHoldCount,cachedHoldCounter等参数
4,如果设置锁不成功的话就把当前线程加入到CLH队列中,再次尝试抢锁,失败的话就挂起自己,等待被唤醒
5,被唤醒后,继续尝试抢锁,抢锁成功后,把自己设置为head节点,如果下一个节点也是share节点的话,会一直沿着队列唤醒直到队列的下一个写节点为止

解锁流程

1,更改共享变量
2,唤醒队列中的所有的读节点

结尾

有不当之处欢迎指出,一起学习,共勉

标签:head,读写,读锁,ReentranReadWriteLock,源码,线程,null,rh,节点
来源: https://blog.csdn.net/weixin_39827884/article/details/121104420