Linux中的workqueue机制
作者:互联网
转载与知乎https://zhuanlan.zhihu.com/p/91106844
一、前言
Linux中的workqueue机制是中断底半部的一种实现,同时也是一种通用的任务异步处理的手段。进入workqueue队列处理的任务(work item)在代码中由"work_struct "结构体表示(定义在include/linux/workqueue.h):
struct work_struct {
struct list_head entry;
work_func_t func;
atomic_long_t data;
};
其中,"entry"表示其所挂载的workqueue队列的节点,“func"就是要执行的任务的入口函数。而"data"表示的意义就比较丰富了。最后的4个bits是作为"flags"标志位使用的,中间的4个bits是用于flush功能的"color”。flush功能简单地说就是:等待workqueue队列上的任务都处理完,并清空workqueue队列(由于笔者也没有深入研究过这一块的具体实现原理,在本文的叙述中就不涉及这一部分内容了)。
剩下的bits在不同的场景下有不同的含义(相当于C语言里的"union"),它可以指向work item所在的workqueue队列的地址,由于低8位被挪作他用,因此要求workqueue队列的地址是按照256字节对齐的。它还可以表示处理work item的worker线程所在的pool的ID(关于pool将在本文的后半部分介绍)。
这种在一个C语言变量里塞入不同的类型的数据的方法在Linux的代码实现中还是不难见到的,在目前的workqueue机制中,"flags"和"color"所需的bits都较少,单独使用整形变量去表示确实会增加一定的内存消耗。但这种牺牲可读性的做法也被一些内核开发者认为是比较"ugly"的。
为了充分利用locality,通常选择将处理hardirq的CPU作为该hardirq对应的workqueue底半部的执行CPU,在早期Linux的实现中,每个CPU对应一个workqueue队列,并且每个CPU上只有一个worker线程来处理这个workqueue队列,也就是说workqueue队列和worker线程都是per-CPU的,且一一对应。
让我们看看这种设计存在什么问题。假设现在一个work item(设为w0)被添加到了workqueue队列上。w0需要运行5ms后休眠10ms,接着再运行5ms。在w0开始运行5ms和10ms后,另外两个work items(设为w1和w2)也分别加入了workqueue队列,w1和w2都是需要运行5ms,然后再休眠10ms(该示例来自内核Documentation/core-api/workqueue.rst文档)。
因为只有1个worker线程,所以即便在执行某个work item的时候休眠,其他的work item也得不到执行,因此将这3个work item执行完毕将总共需要55ms的时间。
假设现在一个CPU上有2个worker线程,分别为worker 1和worker 2,那么整个执行时间将缩短到35ms:
如果一个CPU上有3个worker线程,执行时间将进一步缩短到25ms:
二、cmwq
这种在一个CPU上运行多个worker线程的做法,就是2.6.36版本引入的,也是现在Linux内核所采用的concurrency managed workqueue,简称cmwq。一个CPU上是不可能“同时”运行多个线程的,所以这里的名称是concurrency(并发),而不是parallelism(并行)。
显然,设置合适的worker线程数目是很关键的,多了浪费资源,少了又不能充分利用CPU。大体的原则就是:如果现在一个CPU上的所有worker线程都进入了睡眠状态,但workqueue队列上还有未处理的work item,那么就再启动一个worker线程。
一个CPU上同一优先级的所有worker线程(优先级的概念见下文)共同构成了一个worker pool(此概念由内核v3.8引入),我们可能比较熟悉memory pool,当需要内存时,就从空余的memory pool中去获取,同样地,当workqueue上有work item待处理时,我们就从worker pool里挑选一个空闲的worker线程来服务这个work item。
worker pool在代码中由"worker_pool "结构体表示(定义在kernel/workqueue.c):
struct worker_pool {
spinlock_t lock; /* the pool lock */
int cpu; /* the associated cpu */
int id; /* pool ID */
struct list_head idle_list; /* list of idle workers */
DECLARE_HASHTABLE(busy_hash, 6); /* hash of busy workers */
...
}
如果一个worker正在处理work item,那么它就是busy的状态,将挂载在busy workers组成的6阶的hash表上。既然是hash表,那么就需要key,充当这个key的是正在被处理的work item的内存地址。
如果一个worker没有处理work item,那么它就是idle的状态,将挂载在idle workers组成的链表上。因为空闲的worker线程数目较少,用链表管理就可以了,而busy的worker线程可能较多,所以用hash表来组织,以加快查找的速度。
前面说过,有未处理的work item,内核就会启动一个新的worker线程,以提高效率。有创建就有消亡,当现在空闲的worker线程过多的时候,就需要销毁一部分worker线程,以节省CPU资源。就像一家公司,在项目紧张,人员不足的时候需要招人,在项目不足,人员过剩的时候可能就会裁员。至于保留多少空闲线程可以取得较理想的平衡,则涉及到一个颇为复杂的算法,在此就不展开了。
worker线程在代码中由"worker "结构体表示(定义在kernel/workqueue_internal.h):
struct worker {
struct worker_pool *pool; /* the associated pool */
union {
struct list_head entry; /* while idle */
struct hlist_node hentry; /* while busy */
};
struct work_struct *current_work; /* work being processed */
work_func_t current_func; /* current_work's fn */
struct task_struct *task; /* worker task */
struct pool_workqueue *current_pwq; /* current_work's pwq */
...
}
其中,"pool"是这个worker线程所在的worker pool,根据worker线程所处的状态,它要么在idle worker组成的空闲链表中,要么在busy worker组成的hash表中。
"current_work"和"current_func"分别是worker线程正在处理的work item和其对应的入口函数。既然worker线程是一个内核线程,那么不管它是idle,还是busy的,都会对应一个task_struct(由"task"表示)。
"current_pwq"指向被服务的work item所在的workqueue队列,关于workqueue队列的介绍,以及它与worker pool之间的交互,见下文。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-x7K2BeRJ-1634270167751)(C:\Users\TZW\Desktop\v2-12af837f37cb577b2cf1c354879a6719_1440w.jpg)]
在多个worker线程的cmwq模式下,按理一个CPU依然只对应一个workqueue队列,该队列由该CPU的worker pool里的线程共同服务(共享),但别忘了任务是有优先级的,虽不说像完整的系统那样将优先级划分地很细,至少要分成低优先级和高优先级两类吧。为此,目前的设计是一个CPU对应两个workqueue队列,相应地也有两个worker pool,分别服务于这个2个队列。
用"ps"命令来看下系统中都有哪些worker线程,worker线程被命名成了"kworker/n:x"的格式,其中n是worker线程所在的CPU的编号,x是其在worker pool中的编号,如果带了"H"后缀,说明这是高优先级的worker pool。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4cpPadLQ-1634270167754)(C:\Users\TZW\Desktop\v2-2dd85ca2f2ab4741ebad22f784b2f436_720w.jpg)]
还有一些带"u"前缀的,它表示"unbound",意思是这个worker线程不和任何的CPU绑定,而是被所有CPU共享,这种设计主要是为了增加灵活性。"u"后面的这个数字也不再表示CPU的编号,而是表示由这些unbound的worker线程组成的worker pool的ID号。
假设一个系统有4个CPU,那么它就对应至多8个"bound"的worker pool(4个普通优先级+4个高优先级)和8个workqueue队列,每个worker pool里有若干个worker线程,每个workqueue队列上有若干个work items。至于"unbound"的worker pool数目,则是由具体的需求决定的。
上图的"pwq"就是表示workqueue队列的"pool_workqueue "结构体(定义在kernel/workqueue.c)。这里有个对齐的要求,原因就是上文介绍的"work_struct->data"的低8位被占用的问题。
struct pool_workqueue {
int nr_active; /* nr of active works */
int max_active; /* max active works */
struct worker_pool *pool; /* the associated pool */
struct workqueue_struct *wq; /* the owning workqueue */
..
}__aligned(1 << WORK_STRUCT_FLAG_BITS);
- "max_active"和"nr_active"分别是该workqueue队列最大允许和实际挂载的work item的数目。最大允许的work item数目也就决定了该workqueue队列所对应的work pool上最多可能的活跃(busy)的worker线程的数目。
- “pool"指向服务这个workqueue队列的worker pool,至于这个"wq”,它的数据类型是"workqueue_struct",从名字上看这个"workqueue_struct"好像也是表示workqueue队列的,那它和pwq(pool_workqueue)有什么区别呢?
pwq表示的是一个workqueue队列,而"workqueue_struct"表示的是一组同种类型的workqueue队列的集合,具体说来就是普通优先级的workqueue队列构成一个workqueue_struct,高优先级的workqueue队列和unbound的workqueue队列又分别构成两个workqueue_struct。
来看下"workqueue_struct "结构体的定义(代码位于kernel/workqueue.c):
struct workqueue_struct {
struct list_head list; /* list of all workqueues */
struct list_head pwqs; /* all pwqs of this wq */
struct pool_workqueue *cpu_pwqs; /* per-cpu pwqs */
...
}
"list"是workqueue_struct自身串接而成的链表,以方便内核管理。
"pwqs"是同种类型的pwq组成的链表。
三、初始化
至此,有关workqueue机制的5个结构体以及它们之间的相互关系就介绍完了,如果做个类比的话,那么work item就是工件,pwq队列就是这些工件组成的流水线,worker线程就是工人,worker pool就是一个班组的工人构成的集合。
假设一个集团有4个工厂(4个CPU),每个工厂都分了两条流水线,一条是由高级工构成的班组负责的高效流水线,一条是由初级工构成的班组负责的普通流水线,那么这4个工厂的高效流水线都属于同一个技术级别(workqueue_struct),普通流水线则同属于另一个技术级别。
至于unbound的生产班组,就理解为外包吧,它们从组织关系上不属于任何一个工厂,但是可以综合这4个工厂的生产任务的波动,提供对人力资源更机动灵活的配置。
来看一下workqueue机制所需的这一套东西都是怎么创建出来的。负责创建的函数主要是workqueue_init_early(),它的调用关系大致如下:
workqueue_init_early()
--> alloc_workqueue()
--> alloc_and_link_pwqs()
--> init_pwq()
在workqueue_init_early()中,初始化了per-CPU的worker pool,并为这些worker pool指定了ID和关联了CPU,工厂是工人劳动的场所,这一步相当于是把工厂的基础设施建好了。
int __init workqueue_init_early(void)
{
// 创建worker pool
for_each_possible_cpu(cpu) {
struct worker_pool *pool;
for_each_cpu_worker_pool(pool, cpu) {
init_worker_pool(pool);
pool->cpu = cpu;
worker_pool_assign_id(pool);
...
}
// 创建workqueue队列
system_wq = alloc_workqueue("events", 0, 0);
system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND, WQ_UNBOUND_MAX_ACTIVE);
...
}
接下来就是调用alloc_workqueue()创建各种类型的workqueue_struct,基础的是三种:普通优先级的,高优先级的,以及不和CPU绑定的,这一步相当于是把流水线的分类,以及采用自产还是外包的形式确定了。
在alloc_workqueue()中,第3个参数指定了可以并发的worker线程的数目(工厂流水线的容量),最大为512(WQ_MAX_ACTIVE),设为0则表示使用默认值(WQ_MAX_ACTIVE/2=256)。至于为什么是512,仅仅是一种经验的估算而已:
在alloc_and_link_pwqs()中,才是创建per-CPU的pwq队列,并把pwq队列和对应的workqueue_struct关联起来。这一步相当于是按之前确定的分类,依次建好了4个工厂对应的流水线。
int alloc_and_link_pwqs(struct workqueue_struct *wq)
{
bool highpri = wq->flags & WQ_HIGHPRI;
if (!(wq->flags & WQ_UNBOUND)) {
wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
for_each_possible_cpu(cpu) {
init_pwq(pwq, wq, &cpu_pools[highpri]);
...
}
然后就是各个工厂根据市场上来的订单(产生的work items)确立生产任务的要求,并根据订单生产所需的技能级别,招募高级工或者初级工,安排到对应的流水线上。在这个过程中,还需要根据订单量的变化,动态地调整工人的数目,以实现效益的最大化(CPU资源最充分的利用)。
四、参考
http://jake.dothome.co.kr/workqueue-1/
http://jake.dothome.co.kr/workqueue-2/
https://chasinglulu.github.io/2019/07/16/%E4%B8%AD%E6%96%AD%E5%BB%B6%E8%BF%9F%E5%A4%84%E7%90%86%E6%9C%BA%E5%88%B6%E3%80%8Cinterrupt-delay-processing%E3%80%8D/
.kr/workqueue-1/
http://jake.dothome.co.kr/workqueue-2/
https://chasinglulu.github.io/2019/07/16/%E4%B8%AD%E6%96%AD%E5%BB%B6%E8%BF%9F%E5%A4%84%E7%90%86%E6%9C%BA%E5%88%B6%E3%80%8Cinterrupt-delay-processing%E3%80%8D/
http://kernel.meizu.com/linux-workqueue.html
标签:struct,work,worker,线程,Linux,workqueue,机制,pool 来源: https://blog.csdn.net/u010351030/article/details/120780771