我如何使用Dask对NumPy数组切片执行并行操作?
作者:互联网
我有一个大小为n_slice x 2048 x 3的坐标数组,其中n_slice数以万计.我想分别对每个2048 x 3切片应用以下操作
import numpy as np
from scipy.spatial.distance import pdist
# load coor from a binary xyz file, dcd format
n_slice, n_coor, _ = coor.shape
r = np.arange(n_coor)
dist = np.zeros([n_slice, n_coor, n_coor])
# this loop is what I want to parallelize, each slice is completely independent
for i in xrange(n_slice):
dist[i, r[:, None] < r] = pdist(coor[i])
我尝试通过使coor为dask.array来使用Dask,
import dask.array as da
dcoor = da.from_array(coor, chunks=(1, 2048, 3))
但是简单地用dcoor替换coor不会暴露出并行性.我可以看到设置并行线程以在每个片上运行,但是如何利用Dask处理并行性?
这是使用parallel.futures的并行实现
import concurrent.futures
import multiprocessing
n_cpu = multiprocessing.cpu_count()
def get_dist(coor, dist, r):
dist[r[:, None] < r] = pdist(coor)
# load coor from a binary xyz file, dcd format
n_slice, n_coor, _ = coor.shape
r = np.arange(n_coor)
dist = np.zeros([n_slice, n_coor, n_coor])
with concurrent.futures.ThreadPoolExecutor(max_workers=n_cpu) as executor:
for i in xrange(n_slice):
executor.submit(get_dist, cool[i], dist[i], r)
由于没有块间计算,因此此问题可能不太适合Dask.
解决方法:
map_blocks
map_blocks方法可能会有所帮助:
dcoor.map_blocks(pdist)
数组不均匀
看起来您在做一些花哨的切片,以将特定值插入输出数组的特定位置.使用dask.arrays可能很尴尬.相反,我建议制作一个产生numpy数组的函数
def myfunc(chunk):
values = pdist(chunk[0, :, :])
output = np.zeroes((2048, 2048))
r = np.arange(2048)
output[r[:, None] < r] = values
return output
dcoor.map_blocks(myfunc)
延迟
最坏的情况下,您可以随时使用dask.delayed
from dask import delayed, compute
coor2 = delayed(coor)
slices = [coor2[i] for i in range(coor.shape[0])]
slices2 = [delayed(pdist)(slice) for slice in slices]
results = compute(*slices2)
标签:dask,parallel-processing,arrays,python,numpy 来源: https://codeday.me/bug/20191026/1937778.html