编程语言
首页 > 编程语言> > Python-Dask Distributed:在每个工人初始化任务中引入图密度

Python-Dask Distributed:在每个工人初始化任务中引入图密度

作者:互联网

在分布式分布式中,任务通过调度程序分布在群集节点上.我希望介绍对提交给节点的任务的每个节点的依赖性.简而言之,我要执行的计算操作需要:

>将数据预加载到每个节点的GPU上.
>在分块的dask数组中与其他数据一起在每个节点上执行GPU计算.

我也想将(1)和(2)多次放入不同的数据集.

我尝试将其设置为最小示例:

from __future__ import print_function

import dask.array as da
from dask.base import tokenize
from distributed import (Client, LocalCluster,
                         get_worker, as_completed)
import numpy as np

cluster = LocalCluster(n_workers=0)
cluster.start_worker(name="Alice")
cluster.start_worker(name="Bob")
cluster.start_worker(name="Eve")

with cluster, Client(cluster.scheduler_address) as client:
    workers = client.scheduler_info()['workers'].values()
    workers =  [v['name'] for v in workers]

    print("Workers {}".format(workers))

    def init_worker():
        get_worker()._stuff = 0
        return "OK"

    # Call init_worker on each worker. Need pure to
    # ensure this happens multiple times
    init_futures = [client.submit(init_worker, pure=False,
                                            workers=[w])
                                  for w in workers]

    assert all(f.result() == "OK" for f in as_completed(init_futures))

    A = da.arange(0, 20, chunks=(5,), dtype=np.float64)

    def inc_worker(A):
        w = get_worker()
        w._stuff += 1
        print("{}[{}]".format(w.name, w._stuff))
        return A + w._stuff

    def increment(A):
        """ Call inc_worker """
        from dask.base import tokenize
        name = 'increment-' + tokenize(A)
        dsk = { (name, i): (inc_worker, (A.name, i))
                        for n, i in A.dask.keys() }

        dsk.update(A.dask)
        return da.Array(dsk, name, A.chunks, A.dtype)

    print(increment(A).compute())
    print(increment(A).compute())

我想找到一种方法,使提交给每个工作人员的increment- *任务取决于提交给每个工作人员的init-worker- *任务.
换句话说,我要避免等待客户端中的init_futures完成.引入的问题是,尽管我们知道每个init-worker- *任务都与每个工作程序相关联,但没有明显的方法可以预先知道增量-*任务的工作程序关联.

一种可能的方法:

>对于每个inc_worker调用,都生成一个local_client(),该提交以get_worker().data中的init-worker- *为依赖项提交任务.我不喜欢这样,因为开销似乎很大.

有关如何执行此操作的任何建议?

编辑1:实际上,这无需等待init_futures完成即可工作,大概是因为它们是在将任何increment- *任务提交给工作人员之前先提交给工作人员调度程序的.仍然感觉我在做一个假设,尽管这个假设可能并不总是正确的.

编辑2:提到2步骤应在不同的数据集上运行多次.

解决方法:

一些选项:

>使用client.run并等待.这可以完成您上面提交技巧的工作,但是更明确,更轻松.但是,它确实会阻止,您已经说过不想这样做.

client.run(init_worker)

>使用worker –preload脚本在worker启动时运行任意代码.查看http://distributed.readthedocs.io/en/latest/setup.html?highlight=preload#customizing-initialization

cluster.start_worker(..., preload=['myscript.py'])

>使init_worker幂等(可以多次运行而不会产生影响),并始终在inc_worker中调用它

def init_worker():
    if not hasattr(get_worker(), '_stuff'):
        get_worker()._stuff = 0

def inc_worker(...):
    init_worker(...)
    ... do other things ...

另外,我注意到您正在手动构建dask.arrays.您可能需要查看x.map_blocks(my_func)和x.to_delayed / x.from_delayed

标签:dask,python
来源: https://codeday.me/bug/20191111/2018653.html