线程队列 concurrent 协程 greenlet gevent
作者:互联网
死锁问题
所谓死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。
经典问题:哲学家就餐问题
英语:Dining philosophers problem 是在计算机科学中的一个经典问题,用来演示在并发计算中多线程同步(Synchronization)时产生的问题。
import time
from threading import Thread, Lock
def philosopher1(chopsticks, noodles, name):
chopsticks.acquire()
print('%s 拿到了筷子' % name)
time.sleep(1)
noodles.acquire()
print('%s 端起了面条' % name)
time.sleep(1)
print('%s开始吃面' % name)
time.sleep(2)
noodles.release()
print('%s 放下了面条' % name)
chopsticks.release()
print('%s 放下了筷子' % name)
def philosopher2(chopsticks, noodles, name):
noodles.acquire()
print('%s 端起了面条' % name)
time.sleep(1)
chopsticks.acquire()
print('%s 拿到了筷子' % name)
time.sleep(1)
print('%s开始吃面' % name)
time.sleep(2)
chopsticks.release()
print('%s 放下了筷子' % name)
noodles.release()
print('%s 放下了面条' % name)
if __name__ == '__main__':
chopsticks = Lock()
noodles = Lock()
for i1 in ['亚里士多德', '柏拉图', '卡迪尔']:
t1 = Thread(target=philosopher1, args=(chopsticks, noodles, i1))
t1.start()
for i2 in ['尼采', '苏格拉底', '马克思']:
t2 = Thread(target=philosopher2, args=(chopsticks, noodles, i2))
t2.start()
递归锁解决死锁问题
解决方法:递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁。
import time
from threading import Thread, RLock
def philosopher1(chopsticks, noodles, name):
chopsticks.acquire()
print('%s 拿到了筷子' % name)
time.sleep(1)
noodles.acquire()
print('%s 端起了面条' % name)
time.sleep(1)
print('%s开始吃面' % name)
time.sleep(2)
noodles.release()
print('%s 放下了面条' % name)
chopsticks.release()
print('%s 放下了筷子' % name)
def philosopher2(chopsticks, noodles, name):
noodles.acquire()
print('%s 端起了面条' % name)
time.sleep(1)
chopsticks.acquire()
print('%s 拿到了筷子' % name)
time.sleep(1)
print('%s开始吃面' % name)
time.sleep(2)
chopsticks.release()
print('%s 放下了筷子' % name)
noodles.release()
print('%s 放下了面条' % name)
if __name__ == '__main__':
chopsticks = RLock()
noodles = chopsticks
for i1 in ['亚里士多德', '柏拉图', '卡迪尔']:
t1 = Thread(target=philosopher1, args=(chopsticks, noodles, i1))
t1.start()
for i2 in ['尼采', '苏格拉底', '马克思']:
t2 = Thread(target=philosopher2, args=(chopsticks, noodles, i2))
t2.start()
线程队列
同一个进程下多个线程数据是共享的
为什么先同一个进程下还会去使用队列呢?
因为队列是 管道 + 锁
所以用队列还是为了保证数据的安全
线程队列 queque,用法:from queue import Queue。内置方法与进程中的Queue基本一样
先进先出
from queue import Queue
q = Queue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())
'''
结果(先进先出):
first
second
third
'''
后进先出
from queue import LifoQueue
q = LifoQueue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())
'''
输出结果
third
second
first
'''
优先级队列
from queue import PriorityQueue
q = PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((3, 'a'))
q.put((2, 'b'))
q.put((1, 'c'))
print(q.get())
print(q.get())
print(q.get())
Python标准模块——concurrent.futures
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor:进程池,提供异步调用
基本方法
submit(fn, *args, **kwargs)
:异步提交任务
map(func, *iterables, timeout=None, chunksize=1)
:取代for循环submit的操作
shutdown(wait=True)
:相当于进程池的pool.close()+pool.join()
操作
- wait=True,等待池内所有任务执行完毕回收完资源后才继续
- wait=False,立即返回,并不会等待池内的任务执行完毕
- 但不管wait参数为何值,整个程序都会等到所有任务执行完毕
- submit和map必须在shutdown之前
result(timeout=None)
:取得结果
add_done_callback(fn)
:回调函数
done()
:判断某一个线程是否完成
cancle()
:取消某个任务
ProcessPoolExecutor 进程池
from concurrent.futures import ProcessPoolExecutor
import time
import os
# 括号内可以传数字不传的化默认开设当前计算机cpu个数进程
p_pool = ProcessPoolExecutor()
'''
池子造出来之后 里面会固定存几个个进程
这5个进程不会出现重复创建和销毁的过程
'''
def task(n):
print(n, os.getpid())
time.sleep(1)
return n ** n
def call_back(res):
print(res.result())
if __name__ == '__main__':
for i in range(1, 25):
p_pool.submit(task, i).add_done_callback(call_back)
# 拿到回调结果
p_pool.shutdown() # 关闭进程池等进程池中所有任务运行完毕 再往下执行
print('主')
ThreadPoolExecutor 线程池
#介绍
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.
Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.
New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.
#用法
与ProcessPoolExecutor相同
总结
from concurrent.futures import ProcessPoolExecutor
p_pool = PProcessPoolExecutor()
p_pool.submit(task, i).add_done_callback(call_back)
协程
协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是协程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。
需要强调的是:
- python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
- 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)
对比操作系统控制线程的切换,用户在单线程内控制协程的切换。
优点如下:
- 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
- 单线程内就可以实现并发的效果,最大限度地利用cpu
缺点如下:
- 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
- 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
总结协程特点:
- 必须在只有一个单线程里实现并发
- 修改共享数据不需加锁
- 用户程序里自己保存多个控制流的上下文栈
- 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))
'''
进程:资源单位
线程:执行单位
协程:单线程下实现并发
自己在代码层面上检测我们所有的IO操作
一旦遇到了IO了 我们在代码级别完成切换
这样给cpu的感觉是你这个程序一直在运行,没有IO
从而提升程序运行效率
多道技术:
切换+保存状态
cpu两种切换
1、程序遇到IO
2、程序长时间占用
代码是如何做到的
切换+保存状态
切换
切换不一定是提升效率,也有可能是降低效率
比如计算密集型切换起来可能还会降低效率
保存状态
保存上一次执行的状态 暂时挂起 下一次来接着上一次的操作继续往后面执行
'''
gevent模块
from gevent import monkey
monkey.patch_all()
import time
from gevent import spawn
'''
gevent模块本身无法监测常见的一些IO操作
在使用的时候需要额外导入一个
from gevent import monkey
monkey.patch_all()
'''
def eat():
print('吃了一口')
time.sleep(2)
print('又吃了一口')
def play():
print('玩了一下')
time.sleep(3)
print('又玩了一下')
start = time.time()
g1 = spawn(eat)
g2 = spawn(play)
g1.join()
g2.join()
print(time.time()-start)
greenlet模块
from greenlet import greenlet
def eat(name):
print('%s eat 1' %name)
g2.switch('nick')
print('%s eat 2' %name)
g2.switch()
def play(name):
print('%s play 1' %name)
g1.switch()
print('%s play 2' %name)
g1=greenlet(eat)
g2=greenlet(play)
g1.switch('nick')#可以在第一次switch时传入参数,以后都不需要
# 单纯的切换(在没有io的情况下或者没有重复开辟内存空间的操作),反而会降低程序的执行速度。手动切换
标签:协程,name,concurrent,gevent,线程,chopsticks,time,print,noodles 来源: https://www.cnblogs.com/wuzhixian/p/15050313.html