编程语言
首页 > 编程语言> > python-mpi4py:动态数据处理

python-mpi4py:动态数据处理

作者:互联网

我有一个包含股票行情自动收录器的向量,例如代码= [‘AAPL’,’XOM’,’GOOG’],在我的“传统” python程序中,我将遍历该股票行情向量,选择一个股票行情字符串,例如AAPL,导入一个csv包含AAPL股票收益的文件,使用收益作为通用函数的输入,最后生成一个csv文件作为输出.我有4000多个股票行情器,要应用到每个股票行情器的功能需要花费一些时间来处理.我可以使用mpi4py软件包访问计算机群集,每个作业可以访问约100个处理器.我很了解(并能够实现)此mpi example在python中:

from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
if rank == 0:
    data = [i for i in range(8)]
# dividing data into chunks
    chunks = [[] for _ in range(size)]
    for i, chunk in enumerate(data):
        chunks[i % size].append(chunk)
else:
    data = None
    chunks = None
data = comm.scatter(chunks, root=0)
print str(rank) + ': ' + str(data)

[cha@cluster] ~/utils> mpirun -np 3 ./mpi.py 
2: [2, 5]
0: [0, 3, 6]
1: [1, 4, 7]

因此,在此示例中,我们有一个大小为8的数据向量,并将相等数量的数据元素分配给每个处理器(总共3个).如何使用上面类似的示例,为每个处理器分配一个股票行情记录器,并应用需要为每个股票行情器运行的功能?我如何告诉python,一旦处理器有空,就返回代号矢量并处理尚未处理的代号?

解决方法:

还有另一种思考方式.您有100个处理器处理4000个数据块.您可以看到的一种方式是,每个处理器都会获得一个要操作的数据块.平均分配后,每个处理器将获得40个标记.处理器1将获得0-39,处理器2将获得40-79,依此类推.

这样思考,您不必担心处理器完成任务时会发生什么.循环一下:

block_size = len(tickers) / size # this will be 40 in your example
for i in range(block_size):
    ticker = tickers[rank * block_size + i]
    process(ticker)

def process(ticker):
    # load data
    # process data
    # output data

这有意义吗?

[编辑]
如果您想了解更多信息,这实际上只是row-major order索引的一种变体,它是一种访问存储在单个内存中的多维数据的通用方法.

标签:dynamic,mpi,mpi4py,python
来源: https://codeday.me/bug/20191029/1960060.html