系统相关
首页 > 系统相关> > python-在dask生成的进程中调用dask

python-在dask生成的进程中调用dask

作者:互联网

我们有一个包含许多任务的大型项目.我们使用简单图表来安排每个任务.该图的一小部分示例如下.请注意,dask设置为多处理模式.

dask_graph:

  universe: !!python/tuple [gcsstrategies.svc.business_service.UniverseService.load_universe_object, CONTEXT]
  raw_market_data: !!python/tuple [gcsstrategies.svc.data_loading_service.RDWLoader.load_market_data, CONTEXT, universe]
  raw_fundamental_data: !!python/tuple [gcsstrategies.svc.data_loading_service.RDWLoader.load_fundamental_data, CONTEXT, universe]

dask_keys: [raw_fundamental_data]

现在,其中一项任务raw_fundamental_data使用@delay懒惰地调度了dask任务,并使用dask.compute()运行了它们.选择此设计的原因是,将在运行时根据运行时参数动态选择在raw_fundamental_data中由dask计划并懒惰运行的任务列表.

我们看到的错误是:

daemonic processes are not allowed to have children

我们了解这是因为产生的进程正在尝试产生子代.有什么解决办法吗? dask是否可以通过@delay或其他方法允许通过daskgraph计划的任务计划并延迟运行自己的任务?

请注意,在我们的系统中,有许多任务将使用多重处理来运行自己的任务.因此,顺序执行不是一个选择.

解决方法:

多处理调度程序无法执行这种操作.但是,distributed scheduler是(您也可以在单台计算机上轻松使用分布式调度程序.

相关文档页面在这里:

> http://distributed.readthedocs.io/en/latest/task-launch.html
> http://dask.readthedocs.io/en/latest/scheduler-choice.html

这是一个小例子

In [1]: from dask.distributed import Client, local_client

In [2]: def f(n):
   ...:     with local_client() as lc:
   ...:         futures = [lc.submit(lambda x: x + 1, i) for i in range(n)]
   ...:         total = lc.submit(sum, futures)
   ...:         return total.result()
   ...:     

In [3]: c = Client()  # start processes on local machine

In [4]: future = c.submit(f, 10)

In [5]: future.result()
Out[5]: 55

这使用并发.futures接口而不是dask.delayed进行dask,但是您也可以使用dask.delayed.参见http://distributed.readthedocs.io/en/latest/manage-computation.html

标签:dask,multiprocessing,python
来源: https://codeday.me/bug/20191111/2022631.html