深入剖析concurrent.futures
作者:互联网
文章目录
模拟一个故事,从百度图片上面随便找几张美女图片,下载下来,保存到本地,先按照顺序下载
1 顺序同步下载
import random
import time
import requests
urls = [
"https://t7.baidu.com/it/u=3676218341,3686214618&fm=193&f=GIF",
"https://t7.baidu.com/it/u=3930750564,2979238085&fm=193&f=GIF",
"https://pics7.baidu.com/feed/c8ea15ce36d3d5398b62865e47680d55372ab0c1.jpeg?token=43cb8aff8adfd6c74ec99218af7a3aad&s=FD36AD570CBC56949920F8E803003021",
"https://pics4.baidu.com/feed/00e93901213fb80ea99ee55b212dcb28bb3894f6.jpeg?token=910769ca2750ca2900cb28542616f7c2",
"https://gimg2.baidu.com/image_search/src=http%3A%2F%2Finews.gtimg.com%2Fnewsapp_match%2F0%2F11158692545%2F0.jpg&refer=http%3A%2F%2Finews.gtimg.com&app=2002&size=f9999,10000&q=a80&n=0&g=0n&fmt=jpeg?sec=1638945903&t=ab53b7ec3f91652eacf7499b1a4ff529"
]
def use_time(func):
def inner(*args, **kwargs):
s = time.time()
func(*args, **kwargs)
print(f"共消耗了{time.time()-s}s")
return inner
def download(url):
headers = {
'User-Agent':"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.69 Safari/537.36"
}
resp = requests.get(url, headers=headers)
return resp.content
def sava(content):
with open(f'{random.randint(0,100)}.jpg','wb') as f:
f.write(content)
@use_time
def main():
for url in urls:
resp = download(url)
sava(resp)
if __name__ == '__main__':
main()
顺序下载大概使用了15s时间
2 concurrent.futures并发下载
map函数
from concurrent.futures import ThreadPoolExecutor
from test4 import download, sava, urls, use_time
MAX_WORKER = 10
@use_time
def cmain():
resp = ThreadPoolExecutor(max_workers=min(len(urls), MAX_WORKER)).map(
download, urls
)
for _ in resp:
sava(_)
if __name__ == '__main__':
cmain()
快了大概30倍,这只是小任务量的并发,如果任务量足够大,效率可想而知。
map方法与内置的map方法差不多,返回值是一个生成器,包含所有参数列表的返回值。
map返回值是一个包含Future对象的迭代器,迭代器的__next__ 方法调用各个Future对象的 result 方法,因此我们得到的是各个Future的结果,而非Future本身。
submit函数
import random
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
urls = [
"https://t7.baidu.com/it/u=3676218341,3686214618&fm=193&f=GIF",
"https://t7.baidu.com/it/u=3930750564,2979238085&fm=193&f=GIF",
"https://pics7.baidu.com/feed/c8ea15ce36d3d5398b62865e47680d55372ab0c1.jpeg?token=43cb8aff8adfd6c74ec99218af7a3aad&s=FD36AD570CBC56949920F8E803003021",
"https://pics4.baidu.com/feed/00e93901213fb80ea99ee55b212dcb28bb3894f6.jpeg?token=910769ca2750ca2900cb28542616f7c2",
"https://gimg2.baidu.com/image_search/src=http%3A%2F%2Finews.gtimg.com%2Fnewsapp_match%2F0%2F11158692545%2F0.jpg&refer=http%3A%2F%2Finews.gtimg.com&app=2002&size=f9999,10000&q=a80&n=0&g=0n&fmt=jpeg?sec=1638945903&t=ab53b7ec3f91652eacf7499b1a4ff529"
]
def use_time(func):
def inner(*args, **kwargs):
s = time.time()
func(*args, **kwargs)
print(f"共消耗了{time.time()-s}s")
return inner
def download(url):
headers = {
'User-Agent':"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.69 Safari/537.36"
}
resp = requests.get(url, headers=headers)
return resp.content
def sava(content):
with open(f'{random.randint(0,100)}.jpg','wb') as f:
f.write(content)
@use_time
def main():
fu_list = []
with ThreadPoolExecutor(3) as the:
for url in urls:
fu = the.submit(download, url)
print(f"url-{url}->fu-{fu}")
fu_list.append(fu)
for fu in as_completed(fu_list):
print(f"state:{fu}")
resp = fu.result()
sava(resp)
if __name__ == '__main__':
main()
url-https://t7.baidu.com/it/u=3676218341,368621461 ->fu-<Future at 0x21a64dc6550 state=running>
url-https://t7.baidu.com/it/u=3930750564,2979238085->fu-<Future at 0x21a64dd6d00 state=running>
url-https://pics7.baidu.com/feed/c8ea15ce36d3d5398b->fu-<Future at 0x21a64de0910 state=running>
url-https://pics4.baidu.com/feed/00e93901213fb80ea9->fu-<Future at 0x21a64ded310 state=pending>
url-https://gimg2.baidu.com/image_search/src=http%3->fu-<Future at 0x21a64dedac0 state=pending>
state:<Future at 0x21a64dc6550 state=finished returned bytes>
state:<Future at 0x21a64dd6d00 state=finished returned bytes>
state:<Future at 0x21a64de0910 state=finished returned bytes>
state:<Future at 0x21a64dedac0 state=finished returned bytes>
state:<Future at 0x21a64ded310 state=finished returned bytes>
共消耗了0.7982451915740967s
使用两个for循环的结果其实和map的结果是一样的,唯一的不同是map的结果与参数不能保证一一对应,通过submit函数和Future对象能够一一对应。
executor.submit 和 futures.as_completed 这个组合比 executor.map 更灵活,因为 submit 方法能处理不同的可调用对象和参数,而 executor.map 只能处理参数不同的同一个可调用对象。此外,传给 futures.as_completed 函数的Future集合可以来自多个 Executor 实例,例如一些由 ThreadPoolExecutor 实例创建,另一些由 ProcessPoolExecutor 实例创建。
3 GIL、多核心CPU与进程池
Python 标准库中的所有阻塞型 I/O 函数都会释放 GIL,允许其他线程运行。time.sleep() 函数也会释放 GIL。因此,尽管有 GIL,Python 线程还是能在 I/O密集型应用中发挥作用。
如果想利用多核心,那么请移步多进程,concurrent.futures同样提供了进程池对多进程提供支持
concurrent.futures模块实现的是真正的并行计算,因为它使用ProcessPoolExecutor 类把工作分配给多个 Python 进程处理。因此,如果需要做 CPU密集型处理,使用这个模块能绕开 GIL,利用所有可用的 CPU 核心。
import os
from concurrent.futures import ProcessPoolExecutor
from test4 import download, sava, urls, use_time
@use_time
def cmain():
# resp = ThreadPoolExecutor(max_workers=min(len(urls), MAX_WORKER)).map(
# download, urls
# )
with ProcessPoolExecutor() as ppe:
resp = ppe.map(download, urls)
for _ in resp:
sava(_)
if __name__ == '__main__':
cmain()
ThreadPoolExecutor与ProcessPoolExecutor类都继承于同一个接口,不同的一点是,ProcessPoolExecutor的max_worker是可选的,默认值为os.cpu_count(), 也就是看电脑的最多CPU数量。
使用进程池代替线程池进行网络资源下载,速度反而慢了,我猜测可能是应该创建进程和分配资源的时间可能更长。
从这个角度考虑,进程池适用于计算密集型任务,而不适用于IO密集型计算。
标签:__,baidu,fu,futures,剖析,concurrent,https,time,com 来源: https://blog.csdn.net/kobe_okok/article/details/121207785