系统相关
首页 > 系统相关> > Linux中的workqueue机制

Linux中的workqueue机制

作者:互联网

转载与知乎https://zhuanlan.zhihu.com/p/91106844

任务工厂 - Linux中的workqueue机制 [一]

一、前言

Linux中的workqueue机制是中断底半部的一种实现,同时也是一种通用的任务异步处理的手段。进入workqueue队列处理的任务(work item)在代码中由"work_struct "结构体表示(定义在include/linux/workqueue.h):

struct work_struct {
	struct list_head entry;
	work_func_t func;
	atomic_long_t data;
};

其中,"entry"表示其所挂载的workqueue队列的节点,“func"就是要执行的任务的入口函数。而"data"表示的意义就比较丰富了。最后的4个bits是作为"flags"标志位使用的,中间的4个bits是用于flush功能的"color”。flush功能简单地说就是:等待workqueue队列上的任务都处理完,并清空workqueue队列(由于笔者也没有深入研究过这一块的具体实现原理,在本文的叙述中就不涉及这一部分内容了)。

剩下的bits在不同的场景下有不同的含义(相当于C语言里的"union"),它可以指向work item所在的workqueue队列的地址,由于低8位被挪作他用,因此要求workqueue队列的地址是按照256字节对齐的。它还可以表示处理work item的worker线程所在的pool的ID(关于pool将在本文的后半部分介绍)。

这种在一个C语言变量里塞入不同的类型的数据的方法在Linux的代码实现中还是不难见到的,在目前的workqueue机制中,"flags"和"color"所需的bits都较少,单独使用整形变量去表示确实会增加一定的内存消耗。但这种牺牲可读性的做法也被一些内核开发者认为是比较"ugly"的。

img

为了充分利用locality,通常选择将处理hardirq的CPU作为该hardirq对应的workqueue底半部的执行CPU,在早期Linux的实现中,每个CPU对应一个workqueue队列,并且每个CPU上只有一个worker线程来处理这个workqueue队列,也就是说workqueue队列和worker线程都是per-CPU的,且一一对应。

img

让我们看看这种设计存在什么问题。假设现在一个work item(设为w0)被添加到了workqueue队列上。w0需要运行5ms后休眠10ms,接着再运行5ms。在w0开始运行5ms和10ms后,另外两个work items(设为w1和w2)也分别加入了workqueue队列,w1和w2都是需要运行5ms,然后再休眠10ms(该示例来自内核Documentation/core-api/workqueue.rst文档)。

因为只有1个worker线程,所以即便在执行某个work item的时候休眠,其他的work item也得不到执行,因此将这3个work item执行完毕将总共需要55ms的时间。

img

假设现在一个CPU上有2个worker线程,分别为worker 1和worker 2,那么整个执行时间将缩短到35ms:

img

如果一个CPU上有3个worker线程,执行时间将进一步缩短到25ms:

img

二、cmwq

这种在一个CPU上运行多个worker线程的做法,就是2.6.36版本引入的,也是现在Linux内核所采用的concurrency managed workqueue,简称cmwq。一个CPU上是不可能“同时”运行多个线程的,所以这里的名称是concurrency(并发),而不是parallelism(并行)。

显然,设置合适的worker线程数目是很关键的,多了浪费资源,少了又不能充分利用CPU。大体的原则就是:如果现在一个CPU上的所有worker线程都进入了睡眠状态,但workqueue队列上还有未处理的work item,那么就再启动一个worker线程。

一个CPU上同一优先级的所有worker线程(优先级的概念见下文)共同构成了一个worker pool(此概念由内核v3.8引入),我们可能比较熟悉memory pool,当需要内存时,就从空余的memory pool中去获取,同样地,当workqueue上有work item待处理时,我们就从worker pool里挑选一个空闲的worker线程来服务这个work item。

worker pool在代码中由"worker_pool "结构体表示(定义在kernel/workqueue.c):

struct worker_pool {
	spinlock_t		lock;		/* the pool lock */
	int			cpu;		/* the associated cpu */
	int			id;		/* pool ID */

	struct list_head	idle_list;	/* list of idle workers */
	DECLARE_HASHTABLE(busy_hash, 6);        /* hash of busy workers */
    ...
}

如果一个worker正在处理work item,那么它就是busy的状态,将挂载在busy workers组成的6阶的hash表上。既然是hash表,那么就需要key,充当这个key的是正在被处理的work item的内存地址。

如果一个worker没有处理work item,那么它就是idle的状态,将挂载在idle workers组成的链表上。因为空闲的worker线程数目较少,用链表管理就可以了,而busy的worker线程可能较多,所以用hash表来组织,以加快查找的速度。

前面说过,有未处理的work item,内核就会启动一个新的worker线程,以提高效率。有创建就有消亡,当现在空闲的worker线程过多的时候,就需要销毁一部分worker线程,以节省CPU资源。就像一家公司,在项目紧张,人员不足的时候需要招人,在项目不足,人员过剩的时候可能就会裁员。至于保留多少空闲线程可以取得较理想的平衡,则涉及到一个颇为复杂的算法,在此就不展开了。

img

worker线程在代码中由"worker "结构体表示(定义在kernel/workqueue_internal.h):

struct worker {
	struct worker_pool	 *pool;		/* the associated pool */
	union {
		struct list_head  entry;	/* while idle */
		struct hlist_node hentry;	/* while busy */
	};

	struct work_struct	*current_work;	  /* work being processed */
	work_func_t		 current_func;	  /* current_work's fn */
	struct task_struct	*task;		  /* worker task */

	struct pool_workqueue	*current_pwq;     /* current_work's pwq */
        ...
}

其中,"pool"是这个worker线程所在的worker pool,根据worker线程所处的状态,它要么在idle worker组成的空闲链表中,要么在busy worker组成的hash表中。

"current_work"和"current_func"分别是worker线程正在处理的work item和其对应的入口函数。既然worker线程是一个内核线程,那么不管它是idle,还是busy的,都会对应一个task_struct(由"task"表示)。

img

"current_pwq"指向被服务的work item所在的workqueue队列,关于workqueue队列的介绍,以及它与worker pool之间的交互,见下文。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-x7K2BeRJ-1634270167751)(C:\Users\TZW\Desktop\v2-12af837f37cb577b2cf1c354879a6719_1440w.jpg)]

在多个worker线程的cmwq模式下,按理一个CPU依然只对应一个workqueue队列,该队列由该CPU的worker pool里的线程共同服务(共享),但别忘了任务是有优先级的,虽不说像完整的系统那样将优先级划分地很细,至少要分成低优先级和高优先级两类吧。为此,目前的设计是一个CPU对应两个workqueue队列,相应地也有两个worker pool,分别服务于这个2个队列。

img

用"ps"命令来看下系统中都有哪些worker线程,worker线程被命名成了"kworker/n:x"的格式,其中n是worker线程所在的CPU的编号,x是其在worker pool中的编号,如果带了"H"后缀,说明这是高优先级的worker pool。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4cpPadLQ-1634270167754)(C:\Users\TZW\Desktop\v2-2dd85ca2f2ab4741ebad22f784b2f436_720w.jpg)]

还有一些带"u"前缀的,它表示"unbound",意思是这个worker线程不和任何的CPU绑定,而是被所有CPU共享,这种设计主要是为了增加灵活性。"u"后面的这个数字也不再表示CPU的编号,而是表示由这些unbound的worker线程组成的worker pool的ID号。

假设一个系统有4个CPU,那么它就对应至多8个"bound"的worker pool(4个普通优先级+4个高优先级)和8个workqueue队列,每个worker pool里有若干个worker线程,每个workqueue队列上有若干个work items。至于"unbound"的worker pool数目,则是由具体的需求决定的。

img

上图的"pwq"就是表示workqueue队列的"pool_workqueue "结构体(定义在kernel/workqueue.c)。这里有个对齐的要求,原因就是上文介绍的"work_struct->data"的低8位被占用的问题。

struct pool_workqueue {
	int	nr_active;	/* nr of active works */
	int	max_active;	/* max active works */

	struct  worker_pool	   *pool;   /* the associated pool */
	struct  workqueue_struct   *wq;	    /* the owning workqueue */
        ..
}__aligned(1 << WORK_STRUCT_FLAG_BITS);

pwq表示的是一个workqueue队列,而"workqueue_struct"表示的是一组同种类型的workqueue队列的集合,具体说来就是普通优先级的workqueue队列构成一个workqueue_struct,高优先级的workqueue队列和unbound的workqueue队列又分别构成两个workqueue_struct。

来看下"workqueue_struct "结构体的定义(代码位于kernel/workqueue.c):

struct workqueue_struct {
	struct list_head	list;		                   /* list of all workqueues */
	struct list_head	pwqs;		           /* all pwqs of this wq */
	struct pool_workqueue  *cpu_pwqs;         /* per-cpu pwqs */
        ...
}

"list"是workqueue_struct自身串接而成的链表,以方便内核管理。

img

"pwqs"是同种类型的pwq组成的链表。

img

三、初始化

至此,有关workqueue机制的5个结构体以及它们之间的相互关系就介绍完了,如果做个类比的话,那么work item就是工件,pwq队列就是这些工件组成的流水线,worker线程就是工人,worker pool就是一个班组的工人构成的集合。

假设一个集团有4个工厂(4个CPU),每个工厂都分了两条流水线,一条是由高级工构成的班组负责的高效流水线,一条是由初级工构成的班组负责的普通流水线,那么这4个工厂的高效流水线都属于同一个技术级别(workqueue_struct),普通流水线则同属于另一个技术级别。

至于unbound的生产班组,就理解为外包吧,它们从组织关系上不属于任何一个工厂,但是可以综合这4个工厂的生产任务的波动,提供对人力资源更机动灵活的配置。

来看一下workqueue机制所需的这一套东西都是怎么创建出来的。负责创建的函数主要是workqueue_init_early(),它的调用关系大致如下:

workqueue_init_early() 
    --> alloc_workqueue() 
        --> alloc_and_link_pwqs()
             --> init_pwq()

在workqueue_init_early()中,初始化了per-CPU的worker pool,并为这些worker pool指定了ID和关联了CPU,工厂是工人劳动的场所,这一步相当于是把工厂的基础设施建好了。

int __init workqueue_init_early(void)
{
	// 创建worker pool 
	for_each_possible_cpu(cpu) {
		struct worker_pool *pool;
		for_each_cpu_worker_pool(pool, cpu) {
                        init_worker_pool(pool);
			pool->cpu = cpu;
                        worker_pool_assign_id(pool);
	...
        }

        // 创建workqueue队列
	system_wq = alloc_workqueue("events", 0, 0);
	system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
	system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND, WQ_UNBOUND_MAX_ACTIVE);
        ...				    
}

接下来就是调用alloc_workqueue()创建各种类型的workqueue_struct,基础的是三种:普通优先级的,高优先级的,以及不和CPU绑定的,这一步相当于是把流水线的分类,以及采用自产还是外包的形式确定了。

在alloc_workqueue()中,第3个参数指定了可以并发的worker线程的数目(工厂流水线的容量),最大为512(WQ_MAX_ACTIVE),设为0则表示使用默认值(WQ_MAX_ACTIVE/2=256)。至于为什么是512,仅仅是一种经验的估算而已:

img

在alloc_and_link_pwqs()中,才是创建per-CPU的pwq队列,并把pwq队列和对应的workqueue_struct关联起来。这一步相当于是按之前确定的分类,依次建好了4个工厂对应的流水线。

int alloc_and_link_pwqs(struct workqueue_struct *wq)
{
	bool highpri = wq->flags & WQ_HIGHPRI;

	if (!(wq->flags & WQ_UNBOUND)) {
		wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
		for_each_possible_cpu(cpu) {
			init_pwq(pwq, wq, &cpu_pools[highpri]);
	...
}

然后就是各个工厂根据市场上来的订单(产生的work items)确立生产任务的要求,并根据订单生产所需的技能级别,招募高级工或者初级工,安排到对应的流水线上。在这个过程中,还需要根据订单量的变化,动态地调整工人的数目,以实现效益的最大化(CPU资源最充分的利用)。

四、参考

http://jake.dothome.co.kr/workqueue-1/

http://jake.dothome.co.kr/workqueue-2/

https://chasinglulu.github.io/2019/07/16/%E4%B8%AD%E6%96%AD%E5%BB%B6%E8%BF%9F%E5%A4%84%E7%90%86%E6%9C%BA%E5%88%B6%E3%80%8Cinterrupt-delay-processing%E3%80%8D/

.kr/workqueue-1/

http://jake.dothome.co.kr/workqueue-2/

https://chasinglulu.github.io/2019/07/16/%E4%B8%AD%E6%96%AD%E5%BB%B6%E8%BF%9F%E5%A4%84%E7%90%86%E6%9C%BA%E5%88%B6%E3%80%8Cinterrupt-delay-processing%E3%80%8D/

http://kernel.meizu.com/linux-workqueue.html

标签:struct,work,worker,线程,Linux,workqueue,机制,pool
来源: https://blog.csdn.net/u010351030/article/details/120780771