编程语言
首页 > 编程语言> > 读书笔记:《流畅的Python》第17章 使用future处理并发

读书笔记:《流畅的Python》第17章 使用future处理并发

作者:互联网

# 第17章 使用future处理并发

"""
内容提要:
    concurrent.futures模块
    future的概念:是一种对象,表示异步执行的操作
        是concurrent.futures和asynicio包的基础
"""

# 17.1 网络下载的三种风格

# 17.1.1 依序下载的脚本
# 依序下载的脚本,另外两个版本会重用其中的几个函数 flags.py

# 17.1.2 使用concurrent.futures模块下载
"""
只要是使用了 ThreadPoolExecutor类和 ProcessPoolExecutor类
这两个类实现的接口能分别在不同的线程或者进程中执行可调用对象
"""
# flags_threadpool.py:多线程下载的脚本

# 17.1.3 future在哪里?
"""
从python3.4起,标准库中有两个Future的类
    concurrent.futures.Future
    asyncio.Future
这两个类的作用相同:
    两个Future类的实例都表示可以已经完成或者尚未完成的延迟计算
future封装待完成的操作,可以放入队列,完成状态可以查询,得到结果(或者抛出异常)
后可以查询结果(或异常)

通常情况下,不应该自己创建future,而只能由并发框架(concurrent.futures或者asyncio)
实例化
"""

# GIL(Global Interpreter Lock):全局解释器锁,python任何时候都只能运行一个线程
# 但是为什么前面的示例能显著缩短下载的时间呢?

# 17.2阻塞型IO和GIL
"""
CPython解释器本身不是线程安全的,因此有了GIL,依次只允许一个线程执行python字节码.
因此,一个python进程通常不能使用多个cpu核心.
然而,标准库中的所有执行阻塞型I/O操作的函数,在等待操作系统返回结果时,都会释放GIL
一个python线程等待网络响应时,阻塞型I/O函数会释放GIL,再运行一个线程.
"""

# 17.3 使用concurrent.futures模块启动进程
# 在cpu密集型作业中可以使用concurrent.futures轻松绕开GIL

# 17.4 实验Executor.map方法
# demo_executor_map.py

# 17.5显示下载进度并处理错误
# 进度条动画模块tqdm
"""
import time
from tqdm import tqdm
for i in tqdm(range(1000)):
    time.sleep(.001)
"""
# flags2_sequential.py
# 负责下载的基本函数

# flags2_threadpool.py完整代码清单

# 17.5.3 线程和多进程的替代解决方案
# 模块:threading
#     multiprocessing

# 17.6本章小结
# 本章通过concurrent.futures模块探讨了并发编程的实现方式

# 依序下载的脚本,另外两个版本会重用其中的几个函数 flags.py
# 依序下载的脚本,另外两个版本会重用其中的几个函数 flags.py

import os
import time
import sys
import requests

# 下载国旗的国家代码
POP20_CC = ('CN IN US ID BR PL BG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()
# 下载的基地址
BASE_URL = 'http://flupy.org/data/flags'
# 图片存放目录,需要先创建这个目录,或者使用代码检测再创建
DEST_DIR = 'downloads/'
def save_flag(img,filename):
    '''保存图片'''
    path = os.path.join(DEST_DIR,filename)
    # if not os.path.exists(path):  # 这两行代码用于没有手动创建目录的情况
    #     os.mkdir(path)
    with open(path,'wb') as fp:
        fp.write(img)

def get_flag(cc):
    '''爬取数据'''
    url = '{}/{cc}/{cc}.gif'.format(
        BASE_URL,cc=cc.lower()) # 通过基地址构造目标国旗的url
    resp = requests.get(url)
    return resp.content

def show(text):
    '''显示一个字符串,然后刷新sys.stdout,这样能在一行消息中看到进度'''
    print(text,end=' ')
    sys.stdout.flush()

def download_many(cc_list):
    '''批量依次下载'''
    for cc in cc_list:
        image = get_flag(cc)
        show(cc)
        save_flag(image,cc.lower()+'.gif')
    return len(cc_list)

def main(download_many):
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time()-t0  # 计算耗时
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count,elapsed))

if __name__ == '__main__':
    main(download_many)
# flags_threadpool.py:多线程下载的脚本
# flags_threadpool.py:多线程下载的脚本
from concurrent import futures
from flags import save_flag,get_flag,show,main

MAX_WORKERS = 20 # 设定最大线程数量
def download_one(cc):
    '''在各个线程中执行的函数'''
    image = get_flag(cc)
    show(cc)
    save_flag(image,cc.lower()+'.gif')
    return cc

def donwload_many(cc_list):
    '''编写并发代码时经常这样重构:
    把依序执行的for循环改成函数,以便并发调用'''
    # 设定工作的线程数量,使用允许的最大值和要处理的数量之间较小的值,以免浪费
    workers = min(MAX_WORKERS,len(cc_list))
    # 使用工作的线程数实例化ThreadPoolExecutor类
    # executor.__exit__方法会调用executor.shutdown(wait=True)方法,
    # 它会在所有线程都执行完毕前阻塞线程
    with futures.ThreadPoolExecutor(workers) as executor:
        # map()方法的作用和内置的map类似,不过download_one会在多
        # 线程中并发调用,map()方法会返回一个生成器
        res = executor.map(download_one,sorted(cc_list))
    return len(list(res))  # 获取结果的数量 如果有线程抛出异常,会在这里抛出

if __name__ == '__main__':
    main(donwload_many)
# flags_threadpool_ac.py:把download_many函数中的executor.map方法换成
# executor.submit方法和futures.as_completed函数,一窥神秘的future
# flags_threadpool_ac.py:把download_many函数中的executor.map方法换成
# executor.submit方法和futures.as_completed函数,一窥神秘的future

from concurrent import futures
from flags import save_flag,get_flag,show,main

MAX_WORKERS = 20 # 设定最大线程数量
def download_one(cc):
    '''在各个线程中执行的函数'''
    image = get_flag(cc)
    show(cc)
    save_flag(image,cc.lower()+'.gif')
    return cc

def donwload_many(cc_list):
    cc_list = cc_list[:5] # 设定5个任务
    # 设定max_workers最大工作线程为3,以便在输出观察待完成的future
    with futures.ThreadPoolExecutor(max_workers=3) as executor:
        to_do = []
        for cc in sorted(cc_list):
            # executor.submit方法排定可调用对象的执行时间,然后返回一个future
            # 表示一个待执行的操作
            future = executor.submit(download_one,cc)
            to_do.append(future)
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc,future))  # 打印国家代码和future
        results = []
        #  futures.as_completed函数在future运行结束后产出future
        for future in futures.as_completed(to_do):
            res = future.result()
            msg = '{} result:{!r}'
            print(msg.format(future,res))  # 显示future及其结果
            results.append(res)
    return len(results)

if __name__ == '__main__':
    main(donwload_many)

"""
结果:
Scheduled for BR: <Future at 0x228c3c0abe0 state=running>
Scheduled for CN: <Future at 0x228c4016ac0 state=running>
Scheduled for ID: <Future at 0x228c40275b0 state=running>
Scheduled for IN: <Future at 0x228c4027eb0 state=pending>
Scheduled for US: <Future at 0x228c402e940 state=pending>
# 前三个是running,后两个是pending,表示等待线程可用
CN ID <Future at 0x228c4016ac0 state=finished returned str> result:'CN'
<Future at 0x228c40275b0 state=finished returned str> result:'ID'
BR <Future at 0x228c3c0abe0 state=finished returned str> result:'BR'
IN <Future at 0x228c4027eb0 state=finished returned str> result:'IN'
US <Future at 0x228c402e940 state=finished returned str> result:'US'

5 flags downloaded in 3.07s
"""
# demo_executor_map.py 简单演示map方法
# demo_executor_map.py 简单演示map方法

from time import sleep,strftime
from concurrent import futures

def display(*args):
    '''把传入的参数打印出来并加上时间戳'''
    print(strftime('[%H:%M:%S]'),end=' ')
    print(*args)

def loiter(n):
    '''开始时打印一个消息
    休眠n秒
    结束时再显示一个消息'''
    msg = '{}loiter({}):doing nothing for {}s...'
    display(msg.format('\t'*n,n,n))
    sleep(n)
    msg = '{}loiter({}):done'
    display(msg.format('\t'*n,n))
    return n*10

def main():
    display('Script starting')
    executor = futures.ThreadPoolExecutor(max_workers=3)
    results = executor.map(loiter,range(5))
    display('results:',results)
    display("Waiting for individual results:")
    for i,result in enumerate(results):
        display('result {}: {}'.format(i,result))

if __name__ == '__main__':
    main()
# flags2_sequential.py

# 因为缺少一些支持模块,该脚本暂时无法执行
# flags2_sequential.py

# 因为缺少一些支持模块,该脚本暂时无法执行
import os
import time
import sys
import collections
import requests
from tqdm import tqdm

def get_flag(base_url,cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url,cc=cc.lower())
    resp = requests.get(url)
    if resp.status_code != 200:
        resp.raise_for_status()
    return resp.content

def download_one(cc,base_url,verbose = False):
    try:
        image = get_flag(base_url,cc)
    except requests.exceptions.HTTPError as exc:
        res = exc.response
        if res.status_code == 404:
            status = HTTPStatus.not_found
            msg = 'not found'
        else:
            raise
    else:
        save_flag(image,cc.lower()+'.gif')
        status = HTTPStatus.ok
        msg = 'OK'
    if verbose:
        print(cc,msg)
    return Result(status,cc)

def download_many(cc_list,base_url,verbose,max_req):
    counter = collections.Counter()
    cc_iter = sorted(cc_list)
    if not verbose:
        cc_iter = tqdm(cc_iter)
    for cc in cc_iter:
        try:
            res =download_one(cc,base_url,verbose)
        except requests.exceptions.HTTPError as exc:
            error_msg = 'HTTP error {res.status_code}-{res.reason}'
            error_msg = error_msg.format(res=exc.response)
        except requests.exceptions.ConnectionError as exc:
            error_msg = 'Connection Error'
        else:
            error_msg = ''
            status = res.status
        if error_msg:
            status = HTTPStatus.error
        counter[status] += 1
        if verbose and error_msg:
            print('*** Error for {}:{}'.format(cc,error_msg))
    return counter

# flags2_threadpool.py完整代码清单
# flags2_threadpool.py完整代码清单
import collections
from concurrent import futures
import requests
import tqdm
from flags2_sequential import download_one
from flags2_common import main,HTTPStatus

DEFAULT_CONCUR_REQ = 30 # 线程池大小
MAX_CONCUR_REQ = 1000

def download_many(cc_list,base_url,verbose,concur_req):
    counter = collections.Counter()
    with futures.ThreadPoolExecutor(max_workers=concur_req) as executor:
        to_do_map = {}
        for cc in sorted(cc_list):
            future = executor.submit(download_one,cc,base_url,verbose)
            to_do_map[future] = cc
        done_iter = futures.as_completed(to_do_map)
        if not verbose:
            done_iter = tqdm.tqdm(done_iter,total=len(cc_list))
        for future in done_iter:
            try:
                res = future.result()
            except requests.exceptions.HTTPError as exc:
                error_msg = 'HTTP error {res.status_code}-{res.reason}'
                error_msg = error_msg.format(res=exc.response)
            except requests.exceptions.ConnectionError as exc:
                error_msg = 'Connection Error'
            else:
                error_msg = ''
                status = res.status
            if error_msg:
                status = HTTPStatus.error
            counter[status] += 1
            if verbose and error_msg:
                print('*** Error for {}:{}'.format(cc, error_msg))
        return counter

if __name__ == '__main__':
    main(download_many,DEFAULT_CONCUR_REQ,MAX_CONCUR_REQ)

35岁学python,也不直到为了啥?

标签:__,17,读书笔记,Python,futures,cc,future,msg,import
来源: https://blog.csdn.net/Morbidmuse/article/details/121904480