编程语言
首页 > 编程语言> > python – 根据Celery的结果路由到worker?

python – 根据Celery的结果路由到worker?

作者:互联网

我最近一直在使用Storm,其中包含一个名为字段分组的概念(与Celery中的group()概念无关),其中具有某个键的消息将始终路由到同一个工作者.

为了更清楚地定义我的意思,这里是来自Storm wiki.

Fields grouping: The stream is partitioned by the fields specified in
the grouping. For example, if the stream is grouped by the “user-id”
field, tuples with the same “user-id” will always go to the same task,
but tuples with different “user-id”‘s may go to different tasks.

例如,从单词列表中读取,我想将以a,b,c开头的单词路由到仅工作进程,d,e,f到另一个,等等.

想要这个的原因可能是因为我希望一个进程负责一组相同数据的数据库读/写,因此进程之间没有竞争条件.

我正在尝试找出在Celery中实现这一目标的最佳方法.

到目前为止,我最好的解决方案是为每个“组”使用一个队列(例如letters.a,letters.d),并确保工作进程的数量与队列数完全匹配.缺点是它必须每个工人只运行一个进程,以及各种情况,例如工人死亡或添加/删除工作人员.

我是Celery的新手,所以如果我提到的概念不正确,请纠正我.

解决方法:

有一些胶水涉及,但这是概念:

有一种方法可以使用CELERY_WORKER_DIRECT将任务直接发送给不同的工作人员.将其设置为True会创建到每个工作人员的路径.

我通过使用celery.current_app.control.inspect().ping()定期确定活动工作者或确定活动主机.例如.:

>>> hosts = sorted(celery.current_app.control.inspect().ping().keys())
['host5', 'host6']

当我需要通过一个键进行路由时,我会根据工作者的数量对值进行哈希处理.这将平均分配任务,并将相同的密钥保存到同一个worker.例如.:

>>> host_id = hash('hello') % len(hosts)
1
>>> host = hosts[host_id]
'host6'

然后在执行任务时,我只需指定交换和路由键,如下所示:

my_task.apply_async(exchange='C.dq', routing_key=host)

有一些缺点:

>从我所看到的,设置>的并发性; 1对一个工人来说,每个过程都会消耗掉同一个过程,否定了整个过程.不幸的解决方法是将其保持在1.
>如果工作人员在ping()和apply_async之间发生故障,则该消息将被发送到不存在的路由.解决此问题的方法是捕获超时,重新声明可用主机,重新发送和重新发送.

标签:python,multithreading,celery,apache-storm
来源: https://codeday.me/bug/20190624/1282453.html