Linux内核机制—rwsem
作者:互联网
基于linux-5.10.66
一、相关结构
1. rwsem 表示结构体 struct rw_semaphore
/* * 对于无竞争的 rwsem,count 和 owner 是任务在获取 rwsem 时需要触及的唯一字段。 * 因此,它们彼此相邻放置,以增加它们共享相同cacheline的机会。 * * 在竞争 rwsem 中,owner 可能是此结构中最常访问的字段,因为持有 osq 锁的乐观 * 等待者将在 owner 上自旋。 对于嵌入的 rwsem,上层结构中的其它热字段应远离rwsem, * 以减少它们共享相同cacheline导致cacheline弹跳问题的机会。 */ struct rw_semaphore { /* 计数和位掩码组成 */ atomic_long_t count; /* task_struct指针值和位掩码组成 */ atomic_long_t owner; #ifdef CONFIG_RWSEM_SPIN_ON_OWNER //默认不使能 struct optimistic_spin_queue osq; /* spinner MCS lock */ #endif /* wait_lock 用于保护wait_list成员 */ raw_spinlock_t wait_lock; struct list_head wait_list; #ifdef CONFIG_DEBUG_RWSEMS //默认不使能 void *magic; #endif #ifdef CONFIG_DEBUG_LOCK_ALLOC //默认不使能 struct lockdep_map dep_map; #endif ANDROID_VENDOR_DATA(1); //u64 android_vendor_data1; ANDROID_OEM_DATA_ARRAY(1, 2); //u64 android_oem_data1[2]; };
成员介绍:
(1) count
/* * Bit 0 - writer locked bit * Bit 1 - waiters present bit * Bit 2 - lock handoff bit * Bits 3-7 - reserved * Bits 8-62 - 55-bit reader count * Bit 63 - read fail bit */ #define RWSEM_WRITER_LOCKED (1UL << 0) //标记有writer在临界区 #define RWSEM_FLAG_WAITERS (1UL << 1) //标记是否有waiter在等待队列上等待 #define RWSEM_FLAG_HANDOFF (1UL << 2) #define RWSEM_FLAG_READFAIL (1UL << (BITS_PER_LONG - 1)) //1<<63 最高bit位为1。表示在临界区的reader太多,计数溢出 #define RWSEM_READER_SHIFT 8 #define RWSEM_READER_BIAS (1UL << RWSEM_READER_SHIFT) //1<<8,在临界区的reader从此bit开始加1计数 #define RWSEM_READER_MASK (~(RWSEM_READER_BIAS - 1)) //低8bit清0,其它bit全为1 #define RWSEM_WRITER_MASK RWSEM_WRITER_LOCKED //1<<0 #define RWSEM_LOCK_MASK (RWSEM_WRITER_MASK|RWSEM_READER_MASK) //(1<<0)|(低8bit清0,其它bit全为1) 表示是否有writer或reader在临界区中 //只要sem->count 和其与不为0,reader就不能走快速路径持锁 #define RWSEM_READ_FAILED_MASK (RWSEM_WRITER_MASK|RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF|RWSEM_FLAG_READFAIL) //(1<<0)|(1<<1)|(1<<2)|(1<<63)
RWSEM_FLAG_READFAIL: 最高有效位(读取失败位)不太可能被设置。 无论如何,在 down_read() 快速路径中仍会检查此保护位,以防我们将来需要将更多reader位用于其他目的。
RWSEM_READ_FAILED_MASK: reader获取锁时若sem->count中包含其内的任意一个掩码,就无法走快速获取获取读锁。
RWSEM_FLAG_HANDOFF: 可以设置或清除handoff标志位的三个位置。
1) rwsem_mark_wake() 读者使用的此函数。
2) rwsem_try_write_lock() 写者使用的此函数。
3) rwsem_down_write_slowpath() 中的错误路径。
对于上述所有情况,wait_lock 都会被持有。writer还必须是 wait_list 中第一个waiter以便可以设置handoff bit位。 因此无法同时设置/清除handoff bit位。
(2) owner
由于至少是8字节对齐,owner的低3bit可以另做它用,如下。其它bit位保存的是持有此rwsem的task_struct结构。
#define RWSEM_READER_OWNED (1UL << 0) // The rwsem is owned by readers #define RWSEM_RD_NONSPINNABLE (1UL << 1) // Readers cannot spin on this lock #define RWSEM_WR_NONSPINNABLE (1UL << 2) // Writers cannot spin on this locks #define RWSEM_NONSPINNABLE (RWSEM_RD_NONSPINNABLE | RWSEM_WR_NONSPINNABLE) //(1<<1)|(1<<2) #define RWSEM_OWNER_FLAGS_MASK (RWSEM_READER_OWNED | RWSEM_NONSPINNABLE) //(1<<0)|(1<<1)|(1<<2)
只有 reader_owned 宏,没有 writer_owned 宏。有设置这个 reader_owned 的逻辑,设置位置为reader持锁进入临界区的位置,但是没有直接清
理这个 reader_owned 的逻辑,但是不需要做额外的清理动作,因为writer持锁时rwsem_set_owner() 中直接 sem->owner = current 进行赋值,
会清理掉reader_owner标志位。
对于多个reader持锁进入临界区,owner保存的是最后一个进入临界区的reader的task_struct结构指针。
(3) wait_list
等待队列,当reader或writer获取不到rwsem而阻塞时,会挂载此链表上。默认是尾插法,从头部唤醒,当做一个队列来使用了。
2. rwsem 等待者结构 rwsem_waiter
struct rwsem_waiter { /* 通过此成员挂载 rwsem->wait_list 链表上 */ struct list_head list; /* waiter的 task_struct 结构 */ struct task_struct *task; /* 标记是reader还是writer,分别取值为 RWSEM_WAITING_FOR_READ/RWSEM_WAITING_FOR_WRITE */ enum rwsem_waiter_type type; /* 挂入等待链表之前设置,通常是4ms */ unsigned long timeout; /* 上一个reader owner */ unsigned long last_rowner; };
二、相关函数
1. 初始化函数
/* 初始化宏,将 sem->count 设置为 RWSEM_UNLOCKED_VALUE(8), 将 sem->owner 设置为0 */ init_rwsem(sem)
2. 获取读sem
/* 阻塞时是 TASK_UNINTERRUPTIBLE 状态 */ void __sched down_read(struct rw_semaphore *sem); /* 阻塞时是 TASK_INTERRUPTIBLE 状态,writer没有类似接口 */ int __sched down_read_interruptible(struct rw_semaphore *sem) /* 阻塞时是 TASK_KILLABLE 状态 */ int __sched down_read_killable(struct rw_semaphore *sem) /* 尝试获取读锁,不会阻塞,成功获取返回1,失败返回0 */ int down_read_trylock(struct rw_semaphore *sem)
3. 获取写sem
/* 阻塞时是 TASK_UNINTERRUPTIBLE 状态 */ void __sched down_write(struct rw_semaphore *sem) /* 阻塞时是 TASK_KILLABLE 状态 */ int __sched down_write_killable(struct rw_semaphore *sem) /* 尝试获取写锁,不会阻塞,成功获取返回1,失败返回0 */ int down_write_trylock(struct rw_semaphore *sem)
4. 释放读sem
void up_read(struct rw_semaphore *sem)
5. 释放写sem
void up_write(struct rw_semaphore *sem)
可见rwsem只是一个二值信号量,不支持递归获取,获取不到要进入休眠。
三、获取读信号流程
1. down_read
void __sched down_read(struct rw_semaphore *sem) { might_sleep(); /* 若没有使能 CONFIG_LOCKDEP 就是空函数,默认不使能的 */ rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_); /* 若没有使能 CONFIG_LOCK_STAT 就等于 __down_read(sem) */ LOCK_CONTENDED(sem, __down_read_trylock, __down_read); } static inline void __down_read(struct rw_semaphore *sem) { /* * 先执行尝试获取信号量,若获取不到再进入慢速路径,若获取 * 到了就将sem->owner设置为当前线程。 */ if (!rwsem_read_trylock(sem)) { rwsem_down_read_slowpath(sem, TASK_UNINTERRUPTIBLE); /*若不使能 CONFIG_DEBUG_RWSEMS 就是空函数,默认不使能 */ DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem); } else { rwsem_set_reader_owned(sem); } }
走慢速路径之前会调用一次 rwsem_read_trylock() 尝试获取锁,若获取不到就会走慢速路径,获取到后走快速路径。
(1) rwsem_read_trylock(sem) 判断是否能持有读取锁
/* 这个函数只是一个判断,能成功持锁返回1,否则返回0 */ static inline bool rwsem_read_trylock(struct rw_semaphore *sem) { /* 作用:sem->count += RWSEM_READER_BIAS */ long cnt = atomic_long_add_return_acquire(RWSEM_READER_BIAS, &sem->count); /* * 最高bit位为1才会小于0, RWSEM_FLAG_READFAIL 是bit63,但是没有使用过, * 这里应该只是表示reader持锁个数已经导致bit位溢出,应该不会发生这种情况。 */ if (WARN_ON_ONCE(cnt < 0)) rwsem_set_nonspinnable(sem); /* 只要这些标志中sem->count中有1个,reader就不能在快速路径中获取到锁 */ return !(cnt & RWSEM_READ_FAILED_MASK); }
若此时已经有 writer 在临界区了,也就是说 cnt 中包含了 RWSEM_READ_FAILED_MASK 中的 RWSEM_WRITER_MASK,reader 就无法走快速路径获取到锁了,而是要挂到等待链表中等待。
可以再看下面 RWSEM_READ_FAILED_MASK 的定义,只要有writer在临界区、有waiter在等待、写者持锁慢速路径中对wait状态的写者标记了handoff、写者标记了handoff、reader等待的太多(bit63==1),读者都不能走快速路径持锁。
(2) rwsem_set_reader_owned(sem) 持锁快速路径
reader持锁,在sem->owner上或上 RWSEM_READER_OWNED 并保留owner中原来的 RWSEM_RD_NONSPINNABLE 位的值。最后一个持锁的reader才会将其task_struct结构指针设置到sem->owner中。
static inline void rwsem_set_reader_owned(struct rw_semaphore *sem) { __rwsem_set_reader_owned(sem, current); } /* * 最后拥有的 reader 的 task_struct 指针将留在 owner 字段中。 * * 请注意,owner 的值仅表示该任务之前拥有了该 rwsem,当检查该字段时,它可能不 * 再是真正的所有者或真正的所有者之一,因此请谨慎对待。 * * reader的non-spinnable bit位被保留下来。 * * 传参:owner = current * 作用:sem->owner = owner | RWSEM_READER_OWNED | (sem->owner & RWSEM_RD_NONSPINNABLE) */ static inline void __rwsem_set_reader_owned(struct rw_semaphore *sem, struct task_struct *owner) { unsigned long val = (unsigned long)owner | RWSEM_READER_OWNED | (atomic_long_read(&sem->owner) & RWSEM_RD_NONSPINNABLE); atomic_long_set(&sem->owner, val); }
(3) rwsem_down_read_slowpath(sem, TASK_UNINTERRUPTIBLE) 持读取锁的慢速路径
/* * Wait for the read lock to be granted. * __down_read()传参:(sem, TASK_UNINTERRUPTIBLE) */ static struct rw_semaphore __sched * rwsem_down_read_slowpath(struct rw_semaphore *sem, int state) { /* 每增加一个等待的reader就从bit8的reader计算bit位开始增加1 */ long count, adjustment = -RWSEM_READER_BIAS; struct rwsem_waiter waiter; DEFINE_WAKE_Q(wake_q); bool wake = false; bool already_on_list = false; /* * Save the current read-owner of rwsem, if available, and the * reader nonspinnable bit. * 先将 sem->owner 保存到 waiter.last_rowner。在reader的持锁快速路径中 * sem->owner 保存的是最后一个持读取锁的reader. */ waiter.last_rowner = atomic_long_read(&sem->owner); /* * 在reader快速路径持锁时,都会 sem->owner 都会或上 RWSEM_READER_OWNED * 标志。若没有此标志说明上次reader已经不是走快速路径持锁的了。此时 * waiter.last_rowner 中只保留 RWSEM_RD_NONSPINNABLE bit位的值【为啥? * 此bit在默认deconfig下是否有使用 ?】。 */ if (!(waiter.last_rowner & RWSEM_READER_OWNED)) waiter.last_rowner &= RWSEM_RD_NONSPINNABLE; /* 默认没有使能 CONFIG_RWSEM_SPIN_ON_OWNER,此函数恒返回0 */ if (!rwsem_can_spin_on_owner(sem, RWSEM_RD_NONSPINNABLE)) goto queue; .... queue: /* 标记waiter的线程、类型、超时时间【超时时间怎么使用?】 */ waiter.task = current; waiter.type = RWSEM_WAITING_FOR_READ; waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT; //jiffies+1, 即是4ms raw_spin_lock_irq(&sem->wait_lock); if (list_empty(&sem->wait_list)) { /* * 翻译:如果等待队列为空,并且锁没有被writer持有或handoff位未被设置, * 则此reader可以退出慢速路径并立即返回,因为其 RWSEM_READER_BIAS * 位已被设置到count成员中【什么时候设置进去的?】。 * * 默认配置下 adjustment = -RWSEM_READER_BIAS 恒成立。 * 没有 writer 持锁,且没有指定 handoff。 */ if (adjustment && !(atomic_long_read(&sem->count) & (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))) { /* Provide lock ACQUIRE */ smp_acquire__after_ctrl_dep(); //只是smp_rmb() raw_spin_unlock_irq(&sem->wait_lock); /* * 将current线程当做reader设置进sem->owner中,然后就返回了。 * 这体现了多个reader可以同时获取锁。 */ rwsem_set_reader_owned(sem); /* CONFIG_LOCK_EVENT_COUNTS 默认没有使能,是空函数 */ lockevent_inc(rwsem_rlock_fast); return sem; } /* 等待链表为空,但是有writer持锁或指定了handoff bit位,准备好有waiter等待bit位 */ adjustment += RWSEM_FLAG_WAITERS; } /*----下面就是等待链表不为空(有writer持锁或指定了handoff标志)的处理情况了----*/ trace_android_vh_alter_rwsem_list_add(&waiter, sem, &already_on_list); if (!already_on_list) /* 默认逻辑是添加到等待队列的尾部 */ list_add_tail(&waiter.list, &sem->wait_list); /* we're now waiting on the lock, but no longer actively locking */ if (adjustment) /* * 相当于 sem->count = sem->count - RWSEM_READER_BIAS + RWSEM_FLAG_WAITERS * 减去 RWSEM_READER_BIAS 应该是因为在前面的trey_lock中加了。多了一个waiter。 */ count = atomic_long_add_return(adjustment, &sem->count); else count = atomic_long_read(&sem->count); //不会执行到 /* * 翻译: * 如果没有处于active状态的锁,唤醒排队前面的任务。 * 如果没有writers并且我们排在队列的首位,唤醒我们自己的waiter加入现有的活跃readers! * active 锁应该表示获取不到sem而挂在等待链表上的reader和writer. */ if (!(count & RWSEM_LOCK_MASK)) { /* 没有使能 CONFIG_RWSEM_SPIN_ON_OWNER, 是个空函数 */ clear_wr_nonspinnable(sem); wake = true; } /* * 若没有writer持锁,并且sem->wait_list链表为空(adjustment中有RWSEM_FLAG_WAITERS * bit位表示), 那么就标记要唤醒的任务。 */ if (wake || (!(count & RWSEM_WRITER_MASK) && (adjustment & RWSEM_FLAG_WAITERS))) rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q); /*-又重新看了-*/ trace_android_vh_rwsem_wake(sem); raw_spin_unlock_irq(&sem->wait_lock); /* 唤醒queue入等待队列中的任务 */ wake_up_q(&wake_q); /* 这不是睡眠吗,,为啥执行起了唤醒逻辑?*/ /* ----下面就是有writer持有信号量,reader要进入休眠的情况了---- */ /* wait to be given the lock */ trace_android_vh_rwsem_read_wait_start(sem); for (;;) { /* __down_read()传参:state == TASK_UNINTERRUPTIBLE) */ set_current_state(state); /* 如果 waiter.task 为空,退出死循环 */ if (!smp_load_acquire(&waiter.task)) { /* Matches rwsem_mark_wake()'s smp_store_release(). */ break; } /* 若传参state==TASK_UNINTERRUPTIBLE,这里不可能为真 */ if (signal_pending_state(state, current)) { raw_spin_lock_irq(&sem->wait_lock); if (waiter.task) goto out_nolock; raw_spin_unlock_irq(&sem->wait_lock); /* Ordered by sem->wait_lock against rwsem_mark_wake(). */ break; } schedule(); lockevent_inc(rwsem_sleep_reader); } __set_current_state(TASK_RUNNING); trace_android_vh_rwsem_read_wait_finish(sem); lockevent_inc(rwsem_rlock); /* 获得到信号量之后的返回值 */ return sem; out_nolock: //被信号唤醒走这里 list_del(&waiter.list); if (list_empty(&sem->wait_list)) { atomic_long_andnot(RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF, &sem->count); } raw_spin_unlock_irq(&sem->wait_lock); __set_current_state(TASK_RUNNING); trace_android_vh_rwsem_read_wait_finish(sem); lockevent_inc(rwsem_rlock_fail); /* 被信号唤醒时的返回值 */ return ERR_PTR(-EINTR); }
在获取不到信号量,进入休眠之前,还尝试标记唤醒。
rwsem_mark_wake() 函数:
/* * 翻译: * 当被阻塞的线程可以运行时处理释放锁的逻辑: * - 如果我们从 up_xxxx() 来到这里,那么 RWSEM_FLAG_WAITERS 位必须已被设置。 * - 队列中必须有排队的任务 * - wait_lock 必须被调用者持有 * - 任务标记唤醒,调用者必须稍后调用 wake_up_q() 来唤醒阻塞的任务,并降低引用计数,最好是在释放 wait_lock 时做。 * - 任务归零后被唤醒的任务从链表中删除 * - 只有当降级为假时,writers才会被标记为唤醒 */ /* rwsem_down_read_slowpath()传参:(sem, RWSEM_WAKE_ANY, &wake_q) */ static void rwsem_mark_wake(struct rw_semaphore *sem, enum rwsem_wake_type wake_type, struct wake_q_head *wake_q) { struct rwsem_waiter *waiter, *tmp; long oldcount, woken = 0, adjustment = 0; struct list_head wlist; lockdep_assert_held(&sem->wait_lock); /* * 翻译:获取队列头部的waiter,以便我们确定要执行的唤醒。 * 是从 sem->wait_list 链表首,取一个任务来唤醒的。 */ waiter = rwsem_first_waiter(sem); /* * 若等待链表头部排队的是一个writer, 而且唤醒类型是ANY, 那么直接 * 将这个writer放到待唤醒链表上。 */ if (waiter->type == RWSEM_WAITING_FOR_WRITE) { if (wake_type == RWSEM_WAKE_ANY) { /* * 翻译:在队列前面标记 writer 以唤醒。直到调用者稍后真正唤醒了任务, * 其他writer才能窃取它。另一方面,读者会阻塞,因为他们会注意到排队的writer。 */ wake_q_add(wake_q, waiter->task); /* 没使能CONFIG_LOCK_EVENT_COUNTS,是个空函数 */ lockevent_inc(rwsem_wake_writer); } return; } /*---- 下面就是sem的等待链表首不是writer或唤醒类型不是ANY的情况 ----*/ /* * 翻译:如果有太多读者等待了,则不会唤醒读者。最高bit为1才是小于0,实际情况下 * 应该不大会出现。 */ if (unlikely(atomic_long_read(&sem->count) < 0)) return; /* * 翻译:在我们将锁授予下一个读者之前,写者可能会偷走它。 * 我们更喜欢在计算读者数之前进行第一读者授权,这样如果写者偷了锁, * 我们可以提前退出。 */ if (wake_type != RWSEM_WAKE_READ_OWNED) { struct task_struct *owner; adjustment = RWSEM_READER_BIAS; /* 应该是先将sem->count的值返回,再加上adjustment的值 (1)*/ oldcount = atomic_long_fetch_add(adjustment, &sem->count); /* 若此刻有写者持有锁 */ if (unlikely(oldcount & RWSEM_WRITER_MASK)) { /* * 翻译:当我们已经等待“太”长时间(等待写者释放锁)时,请求 HANDOFF 以强制 * 解决该问题。 */ if (!(oldcount & RWSEM_FLAG_HANDOFF) && time_after(jiffies, waiter->timeout)) { /* 这里是减,减减就是加【handoff是专门针对写者吗?】 */ adjustment -= RWSEM_FLAG_HANDOFF; lockevent_inc(rwsem_rlock_handoff); } /* 对 RWSEM_READER_BIAS 加减相互抵消,相当于 sem->count += RWSEM_FLAG_HANDOFF */ atomic_long_add(-adjustment, &sem->count); return; } /*---- 下面就是当前没有writer在等待sem的情况了 ----*/ /* * 翻译:将其设置为 reader-owned 可以让 spinners 及早感知到现在是读者拥有锁。 * 在读者的慢速路径入口处看到的读者不可spin位被复制。 */ owner = waiter->task; /* 若上个被阻塞的reader的nom-spin位被设置,则继承,仍然被设置 */ if (waiter->last_rowner & RWSEM_RD_NONSPINNABLE) { owner = (void *)((unsigned long)owner | RWSEM_RD_NONSPINNABLE); lockevent_inc(rwsem_opt_norspin); } /* 设置等待链表上的首个reader持有锁 */ __rwsem_set_reader_owned(sem, owner); } /* * 翻译: * 向队列中的所有读者授予最多 MAX_READERS_WAKEUP 读取锁。 我们知道被唤醒任 * 务至少是1个,正如我们上面所解释的。请注意,在唤醒任何进程之前,我们将计 * 数的“active part”增加读者的数量。 * * 这是对相位公平 R/W 锁的改编,其中在读者阶段(第一个等待者是读者),所有 * 读者都有资格同时获取锁,而不管他们在队列中的顺序如何。 写者根据他们在队 * 列中的顺序获取锁。 * * 我们必须在 2 遍中进行唤醒,以防止读者计数可能在增加之前减少。 因为将被唤 * 醒的写者可能还没睡。所以它可能会看到 waiter->task 被清除,执行完它的临界 * 区部分并在读者计数增加之前进行解锁。 * * 1) 在单独的链表中收集读取等待者,对它们进行计数并在 rwsem 中增加读者计数。 * 2) 对于新链表中的每一个waiter,清空waiter->task 并放入wake_q 以便稍后唤醒。 */ INIT_LIST_HEAD(&wlist); list_for_each_entry_safe(waiter, tmp, &sem->wait_list, list) { /* 跳过等待链表上的writer */ if (waiter->type == RWSEM_WAITING_FOR_WRITE) continue; woken++; /* 将等待链表上的reader摘取下来挂在临时链表wlist的尾部 */ list_move_tail(&waiter->list, &wlist); /* 翻译:限制每次调用可以唤醒的读者数量,256个, 应该很难达到。*/ if (woken >= MAX_READERS_WAKEUP) break; } /* 减去应该是去除 (1) 位置的影响*/ adjustment = woken * RWSEM_READER_BIAS - adjustment; lockevent_cond_inc(rwsem_wake_reader, woken); /* * 等待链表已经为空了说明等待链表上没有writer(因为没有往下摘writer), * 也没有reader了,这里减去有waiter的标志 */ if (list_empty(&sem->wait_list)) { /* hit end of list above */ adjustment -= RWSEM_FLAG_WAITERS; } /* * 翻译:当我们唤醒读者后,就不再需要强制写者放弃锁,可以清除 HANDOFF 标志了。【?】 */ if (woken && (atomic_long_read(&sem->count) & RWSEM_FLAG_HANDOFF)) adjustment -= RWSEM_FLAG_HANDOFF; /* * 相当于进来时 sem->count 的状态加上 woken * RWSEM_READER_BIAS,并且没有handoff * 标志。 * 可见(要)唤醒状态的reader才会出记录在sem->count中,已经被阻塞的不会。 */ if (adjustment) atomic_long_add(adjustment, &sem->count); /* 2nd pass */ /* 临时链表 wlist 上只挂入了reader */ list_for_each_entry_safe(waiter, tmp, &wlist, list) { struct task_struct *tsk; tsk = waiter->task; /* 增加计数,防止使用过程中 task_struct 结构被释放 */ get_task_struct(tsk); /* * 翻译:确保在将 reader waiter 设置为 NULL 之前调用 get_task_struct(), * 这样 rwsem_down_read_slowpath() 在整个唤醒任务的过程中始终保持引用 * 计数而不会和 do_exit() 存在竞争。 */ /* waiter->task == NULL 【waiter->task 设置为NULL是什么意思?】*/ smp_store_release(&waiter->task, NULL); /* 翻译:确保在将 reader waiter 设置为 NULL 后开始唤醒(由我们或其他人) */ wake_q_add_safe(wake_q, tsk); } }
主要逻辑:若链表首是个writer,且唤醒类型是ANY,只唤醒此writer就返回。若链表首不是writer,就唤醒等待队列中的所有reader。唤醒所有的reader能获得更高的并发效果,因为所有的reader能同时进入临界区。queue在等待队列中的writer是还没有进入临界区的。
sem->count 中对于 writer 只有一个标记,唤醒 writer 没有像唤醒 reader 那样从 sem->count 中减去对 reader 的计数。
四、获取写信号量流程
1. down_write()
/* * lock for writing */ void __sched down_write(struct rw_semaphore *sem) { might_sleep(); rwsem_acquire(&sem->dep_map, 0, 0, _RET_IP_); /* 没有使能CONFIG_LOCK_STAT,等效于__down_write(sem) */ LOCK_CONTENDED(sem, __down_write_trylock, __down_write); } EXPORT_SYMBOL(down_write); /* * lock for writing */ static inline void __down_write(struct rw_semaphore *sem) { long tmp = RWSEM_UNLOCKED_VALUE; /* * atomic_long_try_cmpxchg_acquire(atomic_long_t *v, long *old, long new) * 若*v==*old, *v=new, return true * 若*v!=*old, *old=*v, return false * * 作用是原子的执行: * if (sem->count == RWSEM_UNLOCKED_VALUE) { * sem->count = RWSEM_WRITER_LOCKED; * } */ if (unlikely(!atomic_long_try_cmpxchg_acquire(&sem->count, &tmp, RWSEM_WRITER_LOCKED))) { rwsem_down_write_slowpath(sem, TASK_UNINTERRUPTIBLE); } else { /* 当前sem还没被持有,writer快速持有路径。直接 sem->owner = current */ rwsem_set_owner(sem); } }
(1) 尝试获取写锁
判断 sem->count 是否为0,若是,就将起赋值为 RWSEM_WRITER_LOCKED,表示 writer 持有锁了,返回真。
(2) 持写锁的快速路径
上面尝试获取锁成功后,直接 sem->owner = current,表示持锁的是当前任务。
(3) 持写锁的慢速路径
若前面尝试持写锁失败,将进入持写锁慢速路径。执行 rwsem_down_write_slowpath() 函数
/* * Wait until we successfully acquire the write lock. * * __down_write 传参:(sem, TASK_UNINTERRUPTIBLE) */ static struct rw_semaphore *rwsem_down_write_slowpath(struct rw_semaphore *sem, int state) { long count; bool disable_rspin; enum writer_wait_state wstate; struct rwsem_waiter waiter; struct rw_semaphore *ret = sem; DEFINE_WAKE_Q(wake_q); bool already_on_list = false; /* do optimistic spinning and steal lock if possible */ /* CONFIG_RWSEM_SPIN_ON_OWNER 默认不使能,恒返回false,不执行 */ if (rwsem_can_spin_on_owner(sem, RWSEM_WR_NONSPINNABLE) && rwsem_optimistic_spin(sem, true)) { /* rwsem_optimistic_spin() implies ACQUIRE on success */ return sem; } /* * 翻译:当观察到不可spin bit位被设置时,在获取写锁后关闭此rwsem的reader的乐观自旋。 * 检索此标志没有被赋值过,disable_rspin 应该恒为假 */ disable_rspin = atomic_long_read(&sem->owner) & RWSEM_NONSPINNABLE; /* 翻译:乐观自旋失败,继续慢速路径并阻塞,直到可以获取sem。*/ waiter.task = current; waiter.type = RWSEM_WAITING_FOR_WRITE; waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT; //jiffies + 4ms raw_spin_lock_irq(&sem->wait_lock); /* account for this before adding a new element to the list */ wstate = list_empty(&sem->wait_list) ? WRITER_FIRST : WRITER_NOT_FIRST; trace_android_vh_alter_rwsem_list_add(&waiter, sem, &already_on_list); if (!already_on_list) /* 默认是插入等待队列的尾部 */ list_add_tail(&waiter.list, &sem->wait_list); /* we're now waiting on the lock */ /* * 插入时等待队列不为空就是NOT_FIRST。此时可能是只有一个writer在临界区,等待链表上 * 都是reader,也可能已经有writer在等待链表上了,reader和writer都有。 */ if (wstate == WRITER_NOT_FIRST) { count = atomic_long_read(&sem->count); /* * 翻译:如果在我们之前已经有排队等待的任务了,并且: * 1) 没有active状态的锁,唤醒前面排队的进程,因为可能设置了handoff bit位。 * 2) 没有active状态的写者和一些读者,锁必须是读拥有的;所以我们尝试唤醒任 * 何排在我们前面的读锁等待者(由于是尾插法,等待链表上已有的所有读者都在我们前面)。 */ /* * 之前持锁的writer是不是已经释放锁了,若是还没有释放,那么当前等待持锁的writer * 直接去休眠等待。 */ if (count & RWSEM_WRITER_MASK) goto wait; /* 若之前持锁的writer已经释放锁了,那么接下来就把等待的reader都唤醒吧 */ rwsem_mark_wake(sem, (count & RWSEM_READEMASK) ? RWSEM_WAKE_READERS : RWSEM_WAKE_ANY, &wake_q); if (!wake_q_empty(&wake_q)) { /* 翻译:我们希望最小化 wait_lock 保持时间,尤其是在要唤醒大量读者时。*/ raw_spin_unlock_irq(&sem->wait_lock); /* 唤醒等待队列上的所有任务 */ wake_up_q(&wake_q); /* Used again, reinit */ wake_q_init(&wake_q); raw_spin_lock_irq(&sem->wait_lock); } } else { /* * 若插入时writer是等待链表上为空, 也就是此时只有reader持有锁, * 只或上这个标志位,表示有writer在等待锁。 */ atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count); } wait: trace_android_vh_rwsem_wake(sem); /* wait until we successfully acquire the lock */ trace_android_vh_rwsem_write_wait_start(sem); /* __down_write 传参:state == TASK_UNINTERRUPTIBLE */ set_current_state(state); /* 第一层循环 */ for (;;) { /* * 成功获取sem返回true, 失败返回false。这里若传 wstate = WRITER_HANDOFF, * 会赋值到 sem->count 中。 */ if (rwsem_try_write_lock(sem, wstate)) { /* * 外层for循环唯一退出位置。唯一退出条件是成功持有sem。退出时 * sem->wait_lock 是锁定状态的。 */ break; } raw_spin_unlock_irq(&sem->wait_lock); /* * 翻译: * 在未能获取锁和设置了handoff bit位后,尝试在sem的owner上自 * 旋以加速锁的传递。如果前一个owner是一个on-cpu(正在运行)的writer * 并且它刚刚释放了锁,则将返回 OWNER_NULL。在这种情况下,我们尝试 * 在不休眠的情况下再次获取锁。 * * CONFIG_RWSEM_SPIN_ON_OWNER 默认不使能,此if恒不成立。 */ if (wstate == WRITER_HANDOFF && rwsem_spin_on_owner(sem, RWSEM_NONSPINNABLE) == OWNER_NULL) goto trylock_again; //恒不执行 /* Block until there are no active lockers. */ /* active lockers:位于临界区的入任务。睡眠等待,直到自己是 WRITER_FIRST 才能退出 */ /* 第二层循环 */ for (;;) { /* __down_write 传参 state==TASK_UNINTERRUPTABLE, 恒不成立*/ if (signal_pending_state(state, current)) goto out_nolock; /* 将当前任务切走 */ schedule(); lockevent_inc(rwsem_sleep_writer); set_current_state(state); /* If HANDOFF bit is set, unconditionally do a trylock */ if (wstate == WRITER_HANDOFF) break; /* * 若之前writer不是等待链表中的第一个waiter, 而现在是一个waiter了,更新wstate。 * 这里是一级循环退出的唯一条件:WRITER_FIRST --> WRITER_HANDOFF --> break */ if (wstate == WRITER_NOT_FIRST && rwsem_first_waiter(sem) == &waiter) wstate = WRITER_FIRST; /* 若此时writer和reader都没有再持有sem */ count = atomic_long_read(&sem->count); if (!(count & RWSEM_LOCK_MASK)) break; /* * 翻译:handoff 位的设置被推迟到 rwsem_try_write_lock() 被调用时。 * * 此 writer 必须是等待链表上的首个waiter了,并且已经在这里是spin大于4ms了 */ if (wstate == WRITER_FIRST && (rt_task(current) || time_after(jiffies, waiter.timeout))) { wstate = WRITER_HANDOFF; lockevent_inc(rwsem_wlock_handoff); break; } } trylock_again: raw_spin_lock_irq(&sem->wait_lock); } __set_current_state(TASK_RUNNING); trace_android_vh_rwsem_write_wait_finish(sem); /* 将此waiter从等待链表中移除 */ list_del(&waiter.list); /* 若之前是no-spin,退出时还是设置为no-spin */ rwsem_disable_reader_optspin(sem, disable_rspin); raw_spin_unlock_irq(&sem->wait_lock); lockevent_inc(rwsem_wlock); return ret; out_nolock: //被信号唤醒,此Case这里恒不会被执行 __set_current_state(TASK_RUNNING); trace_android_vh_rwsem_write_wait_finish(sem); raw_spin_lock_irq(&sem->wait_lock); list_del(&waiter.list); if (unlikely(wstate == WRITER_HANDOFF)) atomic_long_add(-RWSEM_FLAG_HANDOFF, &sem->count); if (list_empty(&sem->wait_list)) atomic_long_andnot(RWSEM_FLAG_WAITERS, &sem->count); else rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q); raw_spin_unlock_irq(&sem->wait_lock); wake_up_q(&wake_q); lockevent_inc(rwsem_wlock_fail); return ERR_PTR(-EINTR); }
a. reader 获取sem锁时,若是没有 writer 持有,只是在 sem->count 上增加读者计数。之后在 writer 持锁时,发现 sem->count 不为0就会
进入慢速持锁路径,放在等待链表上休眠。若之前持锁的reader在休眠,这里什么也没有做,只是静静的等待reader释放锁。只有在 writer
持锁进入慢速路径时,发现等待链表上已经有任务了,并且之前持锁的writer已经释放锁了,才会尝试将之前阻塞的reader一下子都唤醒。
b. 进入慢速路径的writer若发现 sem->wait_list 等待链表不为空,这说明之前已经有一个writer和若干个reader在等待链表上了。
c. 虽然 sem->count 上 writer 只有一个bit的标志,标识同一时间只能有一个writer进入临界区。但是 sem->wait_list 链表上等待的 writer 却可以同时有多个,它们都不在临界区中。
d. 一直等待,直到此writer成功等待链表上首个waiter了。若它是rt线程或等待时间已经超过了4ms, 就会启用 WRITER_HANDOFF 进行锁交接。
(2) rwsem_try_write_lock() 函数
/* * 翻译: * 必须在持有 sem->wait_lock 的情况下调用此函数,以防止检查 rwsem 等待链表和 * 相应地设置 sem->count 之间的竞争条件。 * * 如果 wstate 为 WRITER_HANDOFF,它将确保设置了切换位或在清除切换位的情况下 * 获取锁。 * * rwsem_down_write_slowpath 传参 wstate 为 WRITER_FIRST 或 WRITER_NOT_FIRST, * 第二轮也可能传参为 WRITER_HANDOFF. * * 成功获取sem返回true, 失败返回false并设置handoff bit位 */ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem, enum writer_wait_state wstate) { long count, new; lockdep_assert_held(&sem->wait_lock); count = atomic_long_read(&sem->count); do { bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF); /* 看来handoff主要是针对等待队列中首个waiter是writer的情况 */ if (has_handoff && wstate == WRITER_NOT_FIRST) return false; new = count; /* 是否有writer或reader在等待sem */ if (count & RWSEM_LOCK_MASK) { /* * 虽然还有持锁,但是传参handoff标志,也不返回false退出了, * 后面这个handoff标志会设置到sem->count中。 */ if (has_handoff || wstate != WRITER_HANDOFF) return false; new |= RWSEM_FLAG_HANDOFF; } else { /* * 既没有writer又没有reader在等待sem,此writer就可以持锁了, * 或上writer标志,清除掉handoff标志。 */ new |= RWSEM_WRITER_LOCKED; new &= ~RWSEM_FLAG_HANDOFF; /* 若等待链表中只有一个元素(本writer自己),标记等待链表上没有waiters */ if (list_is_singular(&sem->wait_list)) new &= ~RWSEM_FLAG_WAITERS; } /* * 若 sem->count==count, sem->count=new, return true; * 若 sem->count!=count, count=sem->count, return false; */ } while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new)); /* * We have either acquired the lock with handoff bit cleared or * set the handoff bit. * 翻译:走到这里了还没退出,说明我们要么获得了锁并清除了handoff bit位,就返回true; * 要么设置了handoff bit位, 返回false. */ if (new & RWSEM_FLAG_HANDOFF) return false; /* 直接单纯是sem->owner=current */ rwsem_set_owner(sem); return true; }
五、释放读信号流程
1. up_read() 函数
/* release a read lock */ void up_read(struct rw_semaphore *sem) { rwsem_release(&sem->dep_map, _RET_IP_); __up_read(sem); } EXPORT_SYMBOL(up_read); /* unlock after reading */ static inline void __up_read(struct rw_semaphore *sem) { long tmp; /* CONFIG_DEBUG_RWSEMS 默认不使能,是空函数 */ DEBUG_RWSEMS_WARN_ON(sem->magic != sem, sem); DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem); /* CONFIG_DEBUG_RWSEMS 默认不使能,是个空函数 */ rwsem_clear_reader_owned(sem); /* sem->count 中的 reader 计数减去1 */ tmp = atomic_long_add_return_release(-RWSEM_READER_BIAS, &sem->count); /* CONFIG_DEBUG_RWSEMS 默认不使能,是个空函数 */ DEBUG_RWSEMS_WARN_ON(tmp < 0, sem); /* 若是有waiter, 才执行唤醒流程。[或上 RWSEM_LOCK_MASK 有什么意义呢,作者笔误?] */ if (unlikely((tmp & (RWSEM_LOCK_MASK|RWSEM_FLAG_WAITERS)) == RWSEM_FLAG_WAITERS)) { clear_wr_nonspinnable(sem); rwsem_wake(sem, tmp); } }
(1) rwsem_wake() 函数
/* * 翻译: * 处理唤醒信号量上的waiter * - 如果我们来到这里,up_read/up_write 会减少 count 的活动部分 */ static struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem, long count) { unsigned long flags; DEFINE_WAKE_Q(wake_q); raw_spin_lock_irqsave(&sem->wait_lock, flags); /* 若等待链表不为空,就执行唤醒动作 */ if (!list_empty(&sem->wait_list)) /* 标记待唤醒的waiter */ rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q); trace_android_vh_rwsem_wake_finish(sem); raw_spin_unlock_irqrestore(&sem->wait_lock, flags); /* 执行唤醒操作 */ wake_up_q(&wake_q); return sem; }
六、释放写信号量流程
1. up_write() 函数
/* release a write lock */ void up_write(struct rw_semaphore *sem) { rwsem_release(&sem->dep_map, _RET_IP_); trace_android_vh_rwsem_write_finished(sem); __up_write(sem); } EXPORT_SYMBOL(up_write); /* unlock after writing */ static inline void __up_write(struct rw_semaphore *sem) { long tmp; /* CONFIG_DEBUG_RWSEMS 默认不使能,是个空函数 */ DEBUG_RWSEMS_WARN_ON(sem->magic != sem, sem); /* * 翻译:如果通过设置 RWSEM_NONSPINNABLE 位将所有权转移给匿名写入者,则 * sem->owner 可能与当前不同。 * * CONFIG_DEBUG_RWSEMS 默认不使能,是个空函数 */ DEBUG_RWSEMS_WARN_ON((rwsem_owner(sem) != current) && !rwsem_test_oflags(sem, RWSEM_NONSPINNABLE), sem); /* 直接 sem->owner=0 */ rwsem_clear_owner(sem); /* 原子的读取-修改-写入,等效于 return sem->count; sem->count += -RWSEM_WRITER_LOCKED; */ tmp = atomic_long_fetch_add_release(-RWSEM_WRITER_LOCKED, &sem->count); /* 有等待的waiter才执行唤醒操作 */ if (unlikely(tmp & RWSEM_FLAG_WAITERS)) rwsem_wake(sem, tmp); }
七、总结
1. rwsem可以有多个读者同时进入临界区,但是读者和写者,写者和写者之间只能有一个进入临界区。
2. 有写者在临界区,对应成功获取写锁,从 down_write() 到 up_write() 之间,前者会在 sem->count 上或上 RWSEM_WRITER_LOCKED 标志,i后者会清除此标志。RWSEM_WRITER_LOCKED 标志标识的是写者在临界区中,慢速路径中写者被挂在等待链表上时是不会或上这个标志的。在读者持锁时会因为 sem->count 中有这个标志而被迫进入慢速路径在等待链表上进行等待。
标签:count,waiter,RWSEM,rwsem,reader,内核,Linux,sem 来源: https://www.cnblogs.com/hellokitty2/p/16343272.html