python asyncio协程动态添加任务、协程池
作者:互联网
文章目录
asyncio 协程介绍:
- 动态添加任务:
- 方案是创建一个线程,使事件循环在线程内永久运行
- 设置守护进程,随着主进程一起关闭
- 自动停止任务
- 阻塞任务完成
- 协程池
- 队列自带阻塞机制,当队列满了后会阻塞,因此可以取代 asyncio.Semaphore()
demo
import asyncio
import aiohttp
import time
import nest_asyncio
import queue
from threading import Thread
class AsyncPool(object):
"""
1. 支持动态添加任务
2. 支持自动停止事件循环
3. 支持最大协程数
"""
def __init__(self, loop=None, maxsize=0):
"""
初始化
:param loop:
:param maxsize: 默认0,不限制队列
"""
# 在jupyter需要这个,不然asyncio运行出错
nest_asyncio.apply()
# 获取一个事件循环
if not loop:
self.loop = asyncio.new_event_loop()
# 队列,先进先出,根据队列是否为空判断,退出协程
self.q = queue.Queue(maxsize)
self.loop_thread = None
if self.loop:
self.start_thread_loop()
def add(self, item=1):
"""
添加任务
:param item:
:return:
"""
self.q.put(item)
def done(self, fn):
"""
任务完成
回调函数
:param fn:
:return:
"""
if fn:
pass
self.q.get()
self.q.task_done()
def wait(self):
"""
等待任务执行完毕
:return:
"""
self.q.join()
@staticmethod
def _start_thread_loop(loop):
"""
运行事件循环
:param loop: loop以参数的形式传递进来运行
:return:
"""
# 将当前上下文的事件循环设置为循环。
asyncio.set_event_loop(loop)
# 开始事件循环
loop.run_forever()
def start_thread_loop(self):
"""
运行事件循环
:return:
"""
self.loop_thread = Thread(target=self._start_thread_loop, args=(self.loop,))
# 设置守护进程
self.loop_thread.setDaemon(True)
# 运行线程,同时协程事件循环也会运行
self.loop_thread.start()
def stop_thread_loop(self, loop_time=1):
"""
队列为空,则关闭线程
:param loop_time:
:return:
"""
async def _close_thread_loop():
"""
关闭线程
:return:
"""
while True:
if self.q.empty():
self.loop.stop()
break
await asyncio.sleep(loop_time)
# 等待关闭线程
asyncio.run_coroutine_threadsafe(_close_thread_loop(), self.loop)
def submit(self, func, callback=None):
"""
提交任务到事件循环
:param func: 异步函数对象
:param callback: 回调函数
:return:
"""
# 将协程注册一个到运行在线程中的循环,thread_loop 会获得一个环任务
# 注意:run_coroutine_threadsafe 这个方法只能用在运行在线程中的循环事件使用
future = asyncio.run_coroutine_threadsafe(func, self.loop)
# 回调函数封装
def callback_done(_future):
try:
if callback:
callback(_future)
finally:
self.done(_future)
# 添加回调函数
future.add_done_callback(callback_done)
def release(self, loop_time=1):
"""
释放线程
:param loop_time:
:return:
"""
self.stop_thread_loop(loop_time)
def running(self):
"""
获取当前线程数
:return:
"""
return self.q.qsize()
async def thread_example(i):
url = "http://127.0.0.1:8080/app04/async4?num={}".format(i)
async with aiohttp.ClientSession() as session:
async with session.get(url) as res:
# print(res.status)
# print(res.content)
return await res.text()
def my_callback(future):
result = future.result()
print('返回值: ', result)
def main():
# 任务组, 最大协程数
pool = AsyncPool(maxsize=10000)
# 插入任务任务
for i in range(100000):
pool.add()
pool.submit(thread_example(i), my_callback)
# 停止事件循环
pool.release()
# 等待
pool.wait()
print("等待子线程结束...")
if __name__ == '__main__':
start_time = time.time()
main()
end_time = time.time()
print("run time: ", end_time - start_time)
标签:协程,thread,python,self,协程池,def,time,loop,asyncio 来源: https://blog.csdn.net/weixin_43968923/article/details/110739088