理解python中的yield、yield在协程中的作用以及实现一个简单的事件循环
作者:互联网
Future 和 Task对象
import uuid
class Future:
def __init__(self, loop):
self._result = None
self._done = False
self._callbacks = []
self._loop = loop
# 给_result 属性赋值,_result 的值结束耗时操作返回的数据
def set_result(self, data):
if self._done:
raise RuntimeError("Future 对象不能重复设置值")
self._done = True
self._result = data
if isinstance(data, Future):
self._result = data._result
for callback in self._callbacks:
self._loop.add_ready_task(callback)
# 获取 future对象中的result 值
def result(self):
if self._done:
return self._result
raise RuntimeError("Future对象 值结果还没就绪")
# await 等待
def __await__(self):
# yield 在异步协程中的作用就是:当执行到调用系统发起io操作后,暂停函数的执行,
# 将当前 future 对象返回,并让出执行权
yield self
return self._result
# 添加回调事件,当 set_result方法被调用的时候,将协程的回调对象放到事件循环中进行执行
def add_done_callback(self, callback, *args):
self._callbacks.append(callback)
class Task(Future):
def __init__(self, core, loop):
super(Task, self).__init__(loop)
# core 就是一个协程任务
self.core = core
self._task_id = uuid.uuid4()
self._future = None
# run方法相当于启动器,启动协程任务函数,io耗时操作都必须与future对象进行关联,当执行到 await future对象的时候
# await 触发future 对象中的 __await__ 方法,yield 暂停函数执行,并返回当前future对象,
# t = self.core.send(Node)执行结束, 此时 future 是执行io操作的Future 对象
def run(self):
try:
print(f"{self._task_id} 任务 开始执行")
future = self.core.send(None)
except StopIteration:
self.set_result(self._future)
print(f"{self._task_id} 协程任务 执行结束")
print("-" * 50)
# 当 self.core 第一次send的时候不会出现报错,并将执行io操作中的future对象返回回来,
# future 对象中执行io操作的地方与系统进行交换,当io操作执行完成后会调用future 对象中的 set_result 方法,
# set_result 方法 将io结果挂到future 属性中,并将回调函数重新放到事件循环中进行执行
else:
print(f"{self._task_id} 任务 执行到io耗时操作,将执行权让出去,设置io回调通知")
print("-" * 50)
future.add_done_callback(self)
self._future = future
EventLoop 事件循环对象
import collections
import heapq
import time
from random import random, randint
from threading import Thread
from async_future_task import Future, Task
class EventLoop:
loop = None
# 单例,事件循环只能有一个
def __new__(cls, *args, **kwargs):
if not cls.loop:
cls.loop = super().__new__(cls)
return cls.loop
def __init__(self):
# 已经准备好可以运行的任务队列
self._ready_que = collections.deque()
# 延时任务列表
self._scheduled = []
self.stop = False
# 创建协程任务对象,并添加到可执行队列中
def create_task(self, core, *args):
task = Task(core, self)
self._ready_que.append(task)
return task
# 添加任务到延时任务队列中
def add_delay_task(self, delay, callback, *args):
t = time.time() + delay
heapq.heappush(self._scheduled, (t, callback, args))
# 添加可执行的任务到任务队列中, 这个函数主要是给future对象进行添加回调任务
def add_ready_task(self, task, *args):
self._ready_que.append(task)
def run_forever(self):
while True:
self.exec_task()
if self.stop and len(self._scheduled) == 0 and len(self._ready_que) == 0:
break
def stop_exit(self):
self.stop = True
# 执行任务
def exec_task(self):
t = time.time()
len_scheduled = len(self._scheduled)
for i in range(len_scheduled):
task = heapq.heappop(self._scheduled)
if task[0] <= t:
self._ready_que.append((task[1], task[2]))
else:
heapq.heappush(self._scheduled, task)
break
len_ready = len(self._ready_que)
for i in range(len_ready):
task = self._ready_que.popleft()
# 如果是task 是 Task 对象的话就执行 run方法
if isinstance(task, Task):
task.run()
# 如果不是Task对象的话 就把task当做函数来执行
else:
task[0](*task[1])
# 这是用户层, 用户只需要 await 框架的异步方法就可以了,
# 不需要关系框架底部是如何实现的
async def get_baidu():
# 在调用fake_io 后等待future 对象,此时会触发 future 对象中的 __await__ 方法,又因为 __await__
# 方法中有 yield , 它会暂停函数的执行,返回future本身对象
data = await aiohttp_request_url()
print("异步任务结束, io操作获取到的值是: ", data)
return data
# aiohttp_request_url 模拟的是异步 http请求,
# 该方法模拟的是框架封装好的、执行调用系统io的步骤
async def aiohttp_request_url():
# 创建future 等待对象
future = Future(loop)
# 执行io耗时操作,此时并不等待,只调用,不等待,将耗时操作托管给系统,
# 系统执行完io耗时操作,自动回调future set_result 方法, fake_io 模拟调用系统发起io操作,系统自动回调结果
fake_io(future)
data = await future
# 可以在await 获取到data 或进行一些数据的处理
return data
def fake_io(future):
def sleep():
global task_run_time
# 随机休眠 0-1秒
task_time = random()
task_run_time += task_time
time.sleep(task_time)
# io耗时操作执行完成,模拟系统回调 set_result 方法,给future对象设置随机值
data = randint(1, 10)
future.set_result(data)
Thread(target=sleep).start()
loop = EventLoop()
start_time = time.time()
task_run_time = 0
for _ in range(1000):
loop.create_task(get_baidu())
loop.add_delay_task(2, loop.stop_exit)
loop.run_forever()
print(f"所有任务执行时间:{task_run_time}, 实际执行时间{time.time() - start_time}")
理解图
标签:task,协程,python,self,yield,future,result,._,def 来源: https://www.cnblogs.com/tnan/p/16424272.html