在python中处理大量文件
作者:互联网
我有大量的报告文件(大约650个文件),需要大约320 M的硬盘,我想处理它们.每个文件中都有很多条目;我应该根据他们的内容计算并记录它们.其中一些是彼此相关的,我应该找到,记录和计算它们;匹配可能在不同的文件中.我写了一个简单的脚本来完成这项工作.我使用了python profiler,它花了大约0.3秒来运行一个单行文件的脚本,有2000行我们需要一半进行处理.但是对于整个目录来说,花了1个半小时才完成.这是我的脚本的样子:
# imports
class Parser(object):
def __init__(self):
# load some configurations
# open some log files
# set some initial values for some variables
def parse_packet(self, tags):
# extract some values from line
def found_matched(self, packet):
# search in the related list to find matched line
def save_packet(self, packet):
# write the line in the appropriate files and increase or decrease some counters
def parse(self, file_addr):
lines = [l for index, l in enumerate(open(file_addr, 'r').readlines()) if index % 2 != 0]
for line in lines:
packet = parse_packet(line)
if found_matched(packet):
# count
self.save_packet(packet)
def process_files(self):
if not os.path.isdir(self.src_dir):
self.log('No such file or directory: ' + str(self.src_dir))
sys.exit(1)
input_dirs = os.walk(self.src_dir)
for dname in input_dirs:
file_list = dname[2]
for fname in file_list:
self.parse(os.path.join(dname[0], fname))
self.finalize_process()
def finalize_process(self):
# closing files
我想将时间减少到至少为当前执行时间的10%.也许多处理可以帮助我或只是当前脚本的一些增强将完成任务.无论如何,你能帮助我吗?
编辑1:
我根据@Reut Sharabani的回答更改了我的代码:
def parse(self, file_addr):
lines = [l for index, l in enumerate(open(file_addr, 'r').readlines()) if index % 2 != 0]
for line in lines:
packet = parse_packet(line)
if found_matched(packet):
# count
self.save_packet(packet)
def process_files(self):
if not os.path.isdir(self.src_dir):
self.log('No such file or directory: ' + str(self.src_dir))
sys.exit(1)
input_dirs = os.walk(self.src_dir)
for dname in input_dirs:
process_pool = multiprocessing.Pool(10)
for fname in file_list:
file_list = [os.path.join(dname[0], fname) for fname in dname[2]]
process_pool.map(self.parse, file_list)
self.finalize_process()
我还在我的类定义之前添加了以下行以避免PicklingError:无法pickle< type'instancemethod'>:属性查找
__builtin __.instancemethod失败:
import copy_reg
import types
def _pickle_method(m):
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
copy_reg.pickle(types.MethodType, _pickle_method)
我在代码中做的另一件事是不在文件处理期间保持打开的日志文件;我打开并关闭它们以写入每个条目只是为了避免ValueError:关闭文件的I / O操作.
现在的问题是我有一些文件正在被多次处理.我的包也错了.我做错了什么?我应该在for循环之前放入process_pool = multiprocessing.Pool(10)吗?考虑到我现在只有一个目录,它似乎不是问题.
编辑2:
我也尝试过这样使用ThreadPoolExecutor:
with ThreadPoolExecutor(max_workers=10) as executor:
for fname in file_list:
executor.submit(self.parse, fname)
结果是正确的,但需要一个半小时才能完成.
解决方法:
首先,“大约650个大约320M的文件”并不是很多.鉴于现代硬盘容易读写100 MB / s,系统的I / O性能可能不是你的瓶颈(也支持“只需0.3秒即可为一个单独的文件运行2000行” ,这清楚地表明CPU限制).但是,从Python中读取文件的确切方式可能效率不高.
此外,在一个通用的多核系统上运行的简单的基于多处理的体系结构将允许您更快地执行分析(这里不需要涉及芹菜,不需要跨越机器边界).
多处理架构
只需看看多处理,您的架构可能会涉及一个管理器进程(父进程),它定义了一个任务队列和一个工作进程池.管理器(或馈送器)将任务(例如文件名)放入队列,工作人员使用这些任务.完成任务后,工作人员让经理知道,并继续消耗下一个任务.
文件处理方法
这是非常低效的:
lines = [l for index, l in enumerate(open(file_addr, 'r').readlines()) if index % 2 != 0]
for line in lines:
...
readlines()在评估列表推导之前读取整个文件.只有在那之后你再次遍历所有行.因此,您在数据中迭代三次.将所有内容组合到一个循环中,这样您只需迭代一次.
标签:file-processing,python,file 来源: https://codeday.me/bug/20190830/1766171.html