python – 多处理另一个函数的函数
作者:互联网
我正在对模拟的时间序列进行分析.基本上,它为每个时间步都执行相同的任务.由于存在非常多的时间步长,并且由于每个步骤的分析是独立的,因此我想创建一个可以多处理另一个函数的函数.后者将有参数,并返回结果.
使用共享的dictionnary和lib concurrent.futures,我设法写了这个:
import concurrent.futures as Cfut
def multiprocess_loop_grouped(function, param_list, group_size, Nworkers, *args):
# function : function that is running in parallel
# param_list : list of items
# group_size : size of the groups
# Nworkers : number of group/items running in the same time
# **param_fixed : passing parameters
manager = mlp.Manager()
dic = manager.dict()
executor = Cfut.ProcessPoolExecutor(Nworkers)
futures = [executor.submit(function, param, dic, *args)
for param in grouper(param_list, group_size)]
Cfut.wait(futures)
return [dic[i] for i in sorted(dic.keys())]
通常,我可以像这样使用它:
def read_file(files, dictionnary):
for file in files:
i = int(file[4:9])
#print(str(i))
if 'bz2' in file:
os.system('bunzip2 ' + file)
file = file[:-4]
dictionnary[i] = np.loadtxt(file)
os.system('bzip2 ' + file)
Map = np.array(multiprocess_loop_grouped(read_file, list_alti, Group_size, N_thread))
或者像这样:
def autocorr(x):
result = np.correlate(x, x, mode='full')
return result[result.size//2:]
def find_lambda_finger(indexes, dic, Deviation):
for i in indexes :
#print(str(i))
# Beach = Deviation[i,:] - np.mean(Deviation[i,:])
dic[i] = Anls.find_first_max(autocorr(Deviation[i,:]), valmax = True)
args = [Deviation]
Temp = Rescal.multiprocess_loop_grouped(find_lambda_finger, range(Nalti), Group_size, N_thread, *args)
基本上,它正在发挥作用.但它运作不佳.有时会崩溃.有时候它实际上会启动一些等于Nworkers的python进程,有时候我只指定Nworkers = 15时有两个或三个运行.
例如,我获得的经典错误在我提出的以下主题中描述:Calling matplotlib AFTER multiprocessing sometimes results in error : main thread not in main loop
什么是Pythonic实现我想要的方式?如何改进控制此功能?如何控制更多运行python进程的数量?
解决方法:
Python多处理的基本概念之一是使用队列.当您有一个可以迭代的输入列表并且不需要由子流程更改时,它可以很好地工作.它还可以很好地控制所有进程,因为您生成了所需的数字,可以将它们空闲或停止.
它也更容易调试.明确共享数据通常是一种更难以正确设置的方法.
根据定义,队列可以保存任何内容,因为它们是可迭代的.因此,您可以使用文件路径字符串填充它们以读取文件,使用不可迭代的数字进行计算甚至可以使用图像进行绘制.
在您的情况下,布局可能如下所示:
import multiprocessing as mp
import numpy as np
import itertools as it
def worker1(in_queue, out_queue):
#holds when nothing is available, stops when 'STOP' is seen
for a in iter(in_queue.get, 'STOP'):
#do something
out_queue.put({a: result}) #return your result linked to the input
def worker2(in_queue, out_queue):
for a in iter(in_queue.get, 'STOP'):
#do something differently
out_queue.put({a: result}) //return your result linked to the input
def multiprocess_loop_grouped(function, param_list, group_size, Nworkers, *args):
# your final result
result = {}
in_queue = mp.Queue()
out_queue = mp.Queue()
# fill your input
for a in param_list:
in_queue.put(a)
# stop command at end of input
for n in range(Nworkers):
in_queue.put('STOP')
# setup your worker process doing task as specified
process = [mp.Process(target=function,
args=(in_queue, out_queue), daemon=True) for x in range(Nworkers)]
# run processes
for p in process:
p.start()
# wait for processes to finish
for p in process:
p.join()
# collect your results from the calculations
for a in param_list:
result.update(out_queue.get())
return result
temp = multiprocess_loop_grouped(worker1, param_list, group_size, Nworkers, *args)
map = multiprocess_loop_grouped(worker2, param_list, group_size, Nworkers, *args)
当您担心队列内存不足时,可以使它更具动态性.在进程运行时,您需要填充和清空队列.请参见此示例here.
最后的话:你要求的不是Pythonic.但对新手来说更容易理解;-)
标签:python,python-3-x,multiprocessing,shared-memory,concurrent-futures 来源: https://codeday.me/bug/20190705/1387668.html