编程语言
首页 > 编程语言> > python – 如果自由工作者可用,如何产生未来

python – 如果自由工作者可用,如何产生未来

作者:互联网

我试图将从大文件的行中提取的信息发送到在某个服务器上运行的进程.

为了加快速度,我想在一些并行的线程中做到这一点.

使用concurrent.futures的Python 2.7后端我试过这个:

f = open("big_file")
with ThreadPoolExecutor(max_workers=4) as e:
    for line in f:
        e.submit(send_line_function, line)
f.close()

但是,这是有问题的,因为所有的期货都会立即提交,因此我的机器会耗尽内存,因为完整的文件会被加载到内存中.

我的问题是,如果有一个简单的方法来提供一个新的未来,当一个自由工作者可用.

解决方法:

你可以使用迭代文件的块

for chunk in zip(*[f]*chunksize):

(这是grouper recipe的一个应用程序,它将来自迭代器f的项目收集到大小为chunksize的组中.注意:由于zip在Python3中返回迭代器,因此不会立即使用整个文件.)

import concurrent.futures as CF
import itertools as IT
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')

def worker(line):
    line = line.strip()
    logger.info(line)

chunksize = 1024
with CF.ThreadPoolExecutor(max_workers=4) as executor, open("big_file") as f:
    for chunk in zip(*[f]*chunksize):
        futures = [executor.submit(worker, line) for line in chunk]
        # wait for these futures to complete before processing another chunk
        CF.wait(futures)

现在,在评论中你正确地指出这不是最佳的.
可能会有一些工人需要很长时间,并且占用了大量的工作.

通常,如果对工作人员的每次呼叫花费大致相同的时间,那么这不是什么大问题.但是,这是一种按需推进文件句柄的方法.它使用threading.Condition来通知喷洒器推进文件句柄.

import logging
import threading
import Queue

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')
SENTINEL = object()

def worker(cond, queue):
    for line in iter(queue.get, SENTINEL):
        line = line.strip()
        logger.info(line)
        with cond:
            cond.notify()
            logger.info('notify')

def sprinkler(cond, queue, num_workers):
    with open("big_file") as f:
        for line in f:
            logger.info('advancing filehandle') 
            with cond:
                queue.put(line)
                logger.info('waiting')
                cond.wait()
        for _ in range(num_workers):
            queue.put(SENTINEL)

num_workers = 4
cond = threading.Condition()
queue = Queue.Queue()
t = threading.Thread(target=sprinkler, args=[cond, queue, num_workers])
t.start()

threads = [threading.Thread(target=worker, args=[cond, queue])]
for t in threads:
    t.start()
for t in threads:
    t.join()

标签:concurrent-futures,python,multithreading
来源: https://codeday.me/bug/20191007/1864475.html