第五十六篇 并发之协程
作者:互联网
目录
一、引子
1.线程队列
from queue import Queue, LifoQueue, PriorityQueue
1.普通线程队列
和JoinableQueue队列相似
# 普通线程队列
q = Queue(2) # 设置可以最多放入的元素个数
q.put('a')
q.put('b')
# q.put('c') # 如果要放入的元素或要取出的元素大于设定的个数,则会卡死
print(q.get())
q.task_done() # 调用task_done的个数和存入的个数相同时,join函数就不再阻塞程序
print(q.get())
q.task_done() # 可以将多个task_done放在一起,也有效,只要和存入的元素个数相同即可
q.join() # 阻塞函数,阻塞将一直持续到task_done的调用次数和存入线程队列的元素个数相等为止
print('over')
'''
a
b
over
'''
2.LifoQueue
1.lifo:last in first out 后进先出队列,用于模拟栈
2.和普通线程队列只有取元素的顺序不同,其他一样
lifoq = LifoQueue()
lifoq.put('a')
lifoq.put('b')
print(lifoq.get())
print(lifoq.get())
'''
b
a
'''
3.PriorityQueue
1.具备优先级的队列,取出数据时,会比较大小,越小的数据优先级越高
# 对于数据则直接比较大小,对于字符串则比较首字母在字母表中的先后顺序(数字的优先级高于字母的优先级)
pq1 = PriorityQueue()
pq1.put(20)
pq1.put(2)
print(pq1.get())
print(pq1.get())
'''
2
20
'''
pq2 = PriorityQueue()
pq2.put('sir')
pq2.put('ace')
print(pq2.get())
print(pq2.get())
'''
ace
sir
'''
# 对于有序的容器对象,会先比较第一个元素的大小,如果相同再比较后面的相同位置上的元素大小,如果为空,则优先级最高
pq3 = PriorityQueue()
pq3.put([2,4])
pq3.put([1,5])
pq3.put([1])
print(pq3.get())
print(pq3.get())
'''
[1]
[1, 5]
[2, 4]
'''
2.如果存入的是一个自定义对象,我们可以通过运算符重载来规定比较规则,使得对象可以被比较
class Person:
def __init__(self, name, age):
self.age = age
self.name = name
# 覆盖比较运算符:当在两个对象之间使用比较运算符时,会自动执行该方法
def __lt__(self, other):
# 先比较年龄,如果相同,则比较姓名中字母的有序顺序
# 返回的是bool值,当我们使用优先级队列时,它会根据bool值,谁小就返回谁
if self.age == other.age:
return self.name < other.name
return self.age < other.age
q = PriorityQueue()
p1 = Person('king', 20)
p2 = Person('tom', 18)
q.put(p1)
q.put(p2)
print(q.get().name)
print(q.get().name)
'''
tom
king
'''
2.背景
1.上节课中我们知道GIL锁将导致CPython中的多线程无法并行执行,只能并发的执行,而实现并发的原理是切换+保存,那就意味着使用多线程实现并发,就需要为每一个任务创建一个线程,必然增加了线程创建/销毁/切换/保存所带来的开销
2.高并发下,由于任务数量太多导致无法开启新的线程,会存在既没有实际任务要执行,也无法创建新线程来处理新任务的情况
3.既要保证并发效果,也要避免创建线程带来的开销问题,在这个背景下,协程出现了,协程的原理是使用单线程来实现多任务并发
二、单线程实现并发
1.可行性
1.并发:多个任务同时处理,其实是切换加保存,由于CPU运行速度极快,所以看上去是同时进行
2.并行:利用多核CPU,真正实现多个任务同时处理
3.早期的计算机只有一个CPU,通过CPU切换线程来实现并发,所以线程内实现并发理论上是可行的
2.如何实现
并发 = 切换任务 + 保存状态,只要找到一种方案,能够在两个任务之间切换执行并且保存状态,就可以实现单线程并发
1.用yield实现
1.python中的生成器就具备这样一个特点,每次调用next方法,都会回到生成器函数中执行代码,意味着任务之间可以切换,并且是基于上一次运行的结果,也即是生成器会自动保存执行状态
def task1():
while True:
yield # 不会终止函数,且每次返回一个值,并保留当前状态以供下次调用
print('task1 start')
def task2():
t = task1() # 运行生成器函数
while True:
next(t) # 利用next方法,每次循环都会去生成器中运行代码
print('task2 start')
task2()
import time
# 将注释取消就是协程
def task1():
a = 0
for i in range(10000):
a +=1
# yield
def task2():
# t = task1()
a = 0
for i in range(10000):
a += 1
# next(t)
start_t = time.time()
task2()
print(time.time() - start_t)
# 单线程下串行两个任务,效率反而比线程内并发高,因为并发要切换加保存
2.对于纯计算任务,单线程并发效率比串行还低,所以我们需要用在io操作多的任务中,但是yield生成器方案无法解决阻塞问题,而且如果任务比较多时,代码将非常复杂
3.greenlet模块
1.greenlet模块简化了yield复杂的代码结构,实现了单线程多任务并发
2.但是无论直接使用yield还是greenlet都不能检测IO操作,遇到IO时都会进入阻塞状态,都对纯计算任务而言效率没有提升
3.通过greenlet模块导入greenlet类,实例化对象之后,需要将切换执行的两个任务通过switch方法放入对方的函数内,来不断实现切换加保存,并通过两个任务中的一个实例化对象使用switch方法来开启协程
from greenlet import greenlet
def task1():
a = 0
for i in range(10000):
a +=1
t2.switch() # 用于切换加保存
def task2():
a = 0
for i in range(10000):
a += 1
t1.switch()
start_t = time.time()
t1 = greenlet(task1)
t2 = greenlet(task2)
t1.switch() # 开启任意一个任务即可
print(time.time() - start_t)
三、协程
1.协程是什么
1.单线程下的并发,又称为微线程、纤程,英文名Coroutine。是一种用户态的轻量级线程,即协程是由用户程序自身控制调度的
2.对比操作系统控制线程的切换,用户在单线程内控制协程的切换
3.详解
# 1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
# 2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)
4.优点和缺点
优点:
# 1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
# 2. 单线程内就可以实现并发的效果,最大限度地利用cpu
缺点:
# 1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程来尽可能提高效率
# 2. 协程本质是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
2.gevent模块
1.gevent简介
1.由于greenlet只是提供了一种比generator更加便捷的切换方式,当切到一个任务执行时,如果遇到IO,就会原地阻塞,任然实现不了遇到IO自动切换以提高效率的并发效果
2.任务的代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务1遇到阻塞时,就利用阻塞的时间去执行任务2,如此,才能提高效率,这里就需要Gevent模块
3.Gevent 是一个第三方库,可以轻松通过gevent实现并发编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度
2.gevent如何使用
1.直接导入gevent类,并通过gevent.spawn实例化一个协程对象,spawn里面第一个参数是目标函数,后面的参数可以是目标函数所需要的参数(位置实参、关键字实参皆可)
2.必须至少有一个join函数来控制协程的运行,否则将只会执行主线程
# 执行没有加join函数的代码时不会输出任何消息
# 这是因为协程任务都是以异步方式提交,所以主线程会继续往下执行,而一旦执行完最后一行主线程也就结束了,
# 导致了协程任务没有来的及执行,所以这时候必须join来让主线程等待协程任务执行完毕,也就是让主线程保持存活
# 后续在使用协程时也需要保证主线程一直存活,如果主线程不会结束也就意味着不需要调用join
3.gevent.joinall()函数里面的参数是一个容器,可以将多个协程对象放进去
4.仍然无法解决阻塞时自动切换的问题
import gevent, time
# 会一直等到task1执行完,才会执行task2中的代码
def task1(name):
print('%s start'%name)
time.sleep(2) # 模拟阻塞
print('%s end'%name)
def task2():
print('task2 start')
print('task2 end')
g1 = gevent.spawn(task1, 'king')
g2 = gevent.spawn(task2)
# g1.join()
gevent.joinall([g1, g2])
print('over')
3.monkey补丁
1.monkey补丁的原理是把原始的阻塞方法替换为修改后的非阻塞方法,偷梁换柱,来实现IO自动切换
monkey补丁实现原理(举例):
# myjson.py文件中
def dump():
print("一个被替换的 dump函数")
def load():
print("一个被替换的 load函数")
# test.py 文件中
import myjson
import json
# 补丁函数示例
def monkey_pacth_json():
json.dump = myjson.dump
json.load = myjson.load
# 打补丁
monkey_pacth_json()
# 测试是否替换
json.dump()
json.load()
'''
一个被替换的 dump函数
一个被替换的 load函数
'''
2.通过monkey中的patch_all方法可以实现,遇到IO阻塞自动切换任务的并发效果
3.必须在打补丁后再使用相应的功能,避免忘记,建议写在最上方
from gevent import monkey # 通过gevent导入monkey补丁
monkey.patch_all() # 打补丁
import gevent, time
def task1(name):
print('%s start'%name)
time.sleep(2) # 模拟阻塞
print('%s end'%name)
def task2():
print('task2 start')
time.sleep(3)
print('task2 end')
g1 = gevent.spawn(task1, 'king')
g2 = gevent.spawn(task2)
gevent.joinall([g1, g2])
print('over')
3.案例
1.爬虫
from gevent import monkey
monkey.patch_all()
import gevent, requests, time
def get_page(url):
print('get %s'%url)
response = requests.get(url)
if response.status_code == 200: #
print('%d bytes received from %s'%(len(response.text), url))
start_time = time.time()
gevent.joinall([
gevent.spawn(get_page, 'https://www.python.org/'),
gevent.spawn(get_page,'https://github.com/'),
gevent.spawn(get_page,'https://baidu.com/'),
])
print(time.time() - start_time)
2.TCP通讯
# 服务器
from gevent import monkey
monkey.patch_all()
#如果不想用money.patch_all()打补丁,可以用gevent自带的socket
# from gevent import socket
# s=socket.socket()
import gevent
from socket import *
def server_task(server_ip, port):
s = socket.socket()
s.bind((server_ip, port))
s.listen()
while True:
c, addr = s.accept()
g = gevent.spawn(talk_task, c)
def talk_task(c):
while True:
msg = c.recv(1024).decode('utf-8')
c.send((msg.upper()).encode('utf-8'))
server_task()
# 客户端
from socket import *
import os
from threading import Thread, current_thread
c = socket()
c.connect(('127.0.0.1', 8000))
def client_task():
while True:
msg = '%s : %s'%(os.getpid(), current_thread().name)
c.send(msg.encode('utf-8'))
data = c.recv(1024).decode('utf-8')
print(data)
for i in range (100):
t = Thread(target=client_task)
t.start()
标签:之协程,协程,并发,gevent,线程,time,print,第五十六,def 来源: https://www.cnblogs.com/itboy-newking/p/11185358.html