编程语言
首页 > 编程语言> > python-使用mpi4py并行化函数调用

python-使用mpi4py并行化函数调用

作者:互联网

我想使用mpi4py来优化问题.为了最小化我的功能,我使用scipy中的最小化例程

from scipy.optimize import minimize

def f(x, data) :
    #returns f(x)
x = minimize(f, x0, args=(data))

现在,如果我想使用mpi4py并行化函数.最小化算法的实现是顺序的,并且只能在一个进程上运行,因此仅对我的函数进行并行化,这不是问题,因为函数调用是最耗时的步骤.但是我无法弄清楚如何使用并行和顺序部分来实现此问题.

这是我的尝试:

from scipy.optimize import minimize
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

N = 100 # for testing
step = N//size # say that N is divisible by size
def mpi_f(x, data) :
    x0 = x[rank*step:(rank+1)*step]
    res = f(x0, data)
    res = comm.gather(res, root=0)
    if rank == 0 :
        return res

if rank == 0 :
   x = np.zeros(N)
   xs = minimize(mpi_f, x, args=(data))

这显然不起作用,因为mpi_f仅在进程0上运行.所以我问我应该如何进行?

谢谢.

解决方法:

在您的代码中,根进程是唯一一个调用comm.gather()的进程,因为根进程是唯一一个调用并行成本函数的进程.因此,程序面临僵局.您已经很清楚这个问题.

为了克服这种僵局,其他进程必须根据最小化需求多次调用cost函数.由于此调用数量是未知的,因此while循环似乎适合这些过程.

将定义while循环的停止条件.该标志将从根进程广播到所有进程,因为根进程是唯一意识到minimal()函数结束的事实的人.广播必须在成本函数中执行,因为所有过程都必须在每次迭代中测试最小化函数的结尾.由于最小化使用了函数的返回值,因此标志为passed by reference via a mutable type

最后,这是您的问题的潜在解决方案.它由mpirun -np 4 python main.py运行.我使用fmin()而不是minimal(),因为我的scipy版本已过时.

#from scipy.optimize import minimize
from scipy.optimize import fmin
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

N = 100 # for testing
step = N//size # say that N is divisible by size

def parallel_function_caller(x,stopp):
    stopp[0]=comm.bcast(stopp[0], root=0)
    summ=0
    if stopp[0]==0:
        #your function here in parallel
        x=comm.bcast(x, root=0)
        array= np.arange(x[0]-N/2.+rank*step-42,x[0]-N/2.+(rank+1)*step-42,1.)
        summl=np.sum(np.square(array))
        summ=comm.reduce(summl,op=MPI.SUM, root=0)
        if rank==0:
            print "value is "+str(summ)
    return summ

if rank == 0 :
   stop=[0]
   x = np.zeros(1)
   x[0]=20
   #xs = minimize(parallel_function_caller, x, args=(stop))
   xs = fmin(parallel_function_caller,x0= x, args=(stop,))
   print "the argmin is "+str(xs)
   stop=[1]
   parallel_function_caller(x,stop)

else :
   stop=[0]
   x=np.zeros(1)
   while stop[0]==0:
      parallel_function_caller(x,stop)

标签:multiprocessing,mpi4py,python
来源: https://codeday.me/bug/20191027/1941429.html