Python-Dask Distributed:在每个工人初始化任务中引入图密度
作者:互联网
在分布式分布式中,任务通过调度程序分布在群集节点上.我希望介绍对提交给节点的任务的每个节点的依赖性.简而言之,我要执行的计算操作需要:
>将数据预加载到每个节点的GPU上.
>在分块的dask数组中与其他数据一起在每个节点上执行GPU计算.
我也想将(1)和(2)多次放入不同的数据集.
我尝试将其设置为最小示例:
from __future__ import print_function
import dask.array as da
from dask.base import tokenize
from distributed import (Client, LocalCluster,
get_worker, as_completed)
import numpy as np
cluster = LocalCluster(n_workers=0)
cluster.start_worker(name="Alice")
cluster.start_worker(name="Bob")
cluster.start_worker(name="Eve")
with cluster, Client(cluster.scheduler_address) as client:
workers = client.scheduler_info()['workers'].values()
workers = [v['name'] for v in workers]
print("Workers {}".format(workers))
def init_worker():
get_worker()._stuff = 0
return "OK"
# Call init_worker on each worker. Need pure to
# ensure this happens multiple times
init_futures = [client.submit(init_worker, pure=False,
workers=[w])
for w in workers]
assert all(f.result() == "OK" for f in as_completed(init_futures))
A = da.arange(0, 20, chunks=(5,), dtype=np.float64)
def inc_worker(A):
w = get_worker()
w._stuff += 1
print("{}[{}]".format(w.name, w._stuff))
return A + w._stuff
def increment(A):
""" Call inc_worker """
from dask.base import tokenize
name = 'increment-' + tokenize(A)
dsk = { (name, i): (inc_worker, (A.name, i))
for n, i in A.dask.keys() }
dsk.update(A.dask)
return da.Array(dsk, name, A.chunks, A.dtype)
print(increment(A).compute())
print(increment(A).compute())
我想找到一种方法,使提交给每个工作人员的increment- *任务取决于提交给每个工作人员的init-worker- *任务.
换句话说,我要避免等待客户端中的init_futures完成.引入的问题是,尽管我们知道每个init-worker- *任务都与每个工作程序相关联,但没有明显的方法可以预先知道增量-*任务的工作程序关联.
一种可能的方法:
>对于每个inc_worker调用,都生成一个local_client(),该提交以get_worker().data中的init-worker- *为依赖项提交任务.我不喜欢这样,因为开销似乎很大.
有关如何执行此操作的任何建议?
编辑1:实际上,这无需等待init_futures完成即可工作,大概是因为它们是在将任何increment- *任务提交给工作人员之前先提交给工作人员调度程序的.仍然感觉我在做一个假设,尽管这个假设可能并不总是正确的.
编辑2:提到2步骤应在不同的数据集上运行多次.
解决方法:
一些选项:
>使用client.run并等待.这可以完成您上面提交技巧的工作,但是更明确,更轻松.但是,它确实会阻止,您已经说过不想这样做.
client.run(init_worker)
>使用worker –preload脚本在worker启动时运行任意代码.查看http://distributed.readthedocs.io/en/latest/setup.html?highlight=preload#customizing-initialization
cluster.start_worker(..., preload=['myscript.py'])
>使init_worker幂等(可以多次运行而不会产生影响),并始终在inc_worker中调用它
def init_worker():
if not hasattr(get_worker(), '_stuff'):
get_worker()._stuff = 0
def inc_worker(...):
init_worker(...)
... do other things ...
另外,我注意到您正在手动构建dask.arrays.您可能需要查看x.map_blocks(my_func)和x.to_delayed / x.from_delayed
标签:dask,python 来源: https://codeday.me/bug/20191111/2018653.html