编程语言
首页 > 编程语言> > 理解python中的yield、yield在协程中的作用以及实现一个简单的事件循环

理解python中的yield、yield在协程中的作用以及实现一个简单的事件循环

作者:互联网

Future 和 Task对象

import uuid


class Future:
    def __init__(self, loop):
        self._result = None
        self._done = False
        self._callbacks = []
        self._loop = loop

    # 给_result 属性赋值,_result 的值结束耗时操作返回的数据
    def set_result(self, data):
        if self._done:
            raise RuntimeError("Future 对象不能重复设置值")
        self._done = True
        self._result = data

        if isinstance(data, Future):
            self._result = data._result

        for callback in self._callbacks:
            self._loop.add_ready_task(callback)

    # 获取 future对象中的result 值
    def result(self):
        if self._done:
            return self._result
        raise RuntimeError("Future对象 值结果还没就绪")

    # await 等待
    def __await__(self):
        # yield 在异步协程中的作用就是:当执行到调用系统发起io操作后,暂停函数的执行,
        # 将当前 future 对象返回,并让出执行权
        yield self
        return self._result

    # 添加回调事件,当 set_result方法被调用的时候,将协程的回调对象放到事件循环中进行执行
    def add_done_callback(self, callback, *args):
        self._callbacks.append(callback)


class Task(Future):
    def __init__(self, core, loop):
        super(Task, self).__init__(loop)
        # core 就是一个协程任务
        self.core = core
        self._task_id = uuid.uuid4()
        self._future = None

    # run方法相当于启动器,启动协程任务函数,io耗时操作都必须与future对象进行关联,当执行到 await future对象的时候
    # await 触发future 对象中的 __await__ 方法,yield 暂停函数执行,并返回当前future对象,
    #  t = self.core.send(Node)执行结束, 此时 future 是执行io操作的Future 对象
    def run(self):
        try:
            print(f"{self._task_id} 任务 开始执行")
            future = self.core.send(None)
        except StopIteration:
            self.set_result(self._future)
            print(f"{self._task_id} 协程任务 执行结束")
            print("-" * 50)
        # 当 self.core 第一次send的时候不会出现报错,并将执行io操作中的future对象返回回来,
        # future 对象中执行io操作的地方与系统进行交换,当io操作执行完成后会调用future 对象中的 set_result 方法,
        # set_result 方法 将io结果挂到future 属性中,并将回调函数重新放到事件循环中进行执行
        else:
            print(f"{self._task_id} 任务 执行到io耗时操作,将执行权让出去,设置io回调通知")
            print("-" * 50)
            future.add_done_callback(self)
            self._future = future

EventLoop 事件循环对象

import collections
import heapq
import time
from random import random, randint
from threading import Thread

from async_future_task import Future, Task


class EventLoop:
    loop = None

    # 单例,事件循环只能有一个
    def __new__(cls, *args, **kwargs):
        if not cls.loop:
            cls.loop = super().__new__(cls)
        return cls.loop

    def __init__(self):
        # 已经准备好可以运行的任务队列
        self._ready_que = collections.deque()

        # 延时任务列表
        self._scheduled = []

        self.stop = False

    # 创建协程任务对象,并添加到可执行队列中
    def create_task(self, core, *args):
        task = Task(core, self)
        self._ready_que.append(task)
        return task

    # 添加任务到延时任务队列中
    def add_delay_task(self, delay, callback, *args):
        t = time.time() + delay
        heapq.heappush(self._scheduled, (t, callback, args))

    # 添加可执行的任务到任务队列中, 这个函数主要是给future对象进行添加回调任务
    def add_ready_task(self, task, *args):
        self._ready_que.append(task)

    def run_forever(self):
        while True:
            self.exec_task()
            if self.stop and len(self._scheduled) == 0 and len(self._ready_que) == 0:
                break

    def stop_exit(self):
        self.stop = True

    # 执行任务
    def exec_task(self):
        t = time.time()
        len_scheduled = len(self._scheduled)

        for i in range(len_scheduled):
            task = heapq.heappop(self._scheduled)
            if task[0] <= t:
                self._ready_que.append((task[1], task[2]))
            else:
                heapq.heappush(self._scheduled, task)
                break

        len_ready = len(self._ready_que)
        for i in range(len_ready):
            task = self._ready_que.popleft()
            # 如果是task 是 Task 对象的话就执行 run方法
            if isinstance(task, Task):
                task.run()
            # 如果不是Task对象的话 就把task当做函数来执行
            else:
                task[0](*task[1])


# 这是用户层, 用户只需要 await 框架的异步方法就可以了,
# 不需要关系框架底部是如何实现的
async def get_baidu():
    # 在调用fake_io 后等待future 对象,此时会触发 future 对象中的 __await__ 方法,又因为 __await__
    # 方法中有 yield , 它会暂停函数的执行,返回future本身对象
    data = await aiohttp_request_url()
    print("异步任务结束, io操作获取到的值是: ", data)
    return data


# aiohttp_request_url 模拟的是异步 http请求,
# 该方法模拟的是框架封装好的、执行调用系统io的步骤
async def aiohttp_request_url():
    # 创建future 等待对象
    future = Future(loop)

    # 执行io耗时操作,此时并不等待,只调用,不等待,将耗时操作托管给系统,
    # 系统执行完io耗时操作,自动回调future set_result 方法, fake_io 模拟调用系统发起io操作,系统自动回调结果
    fake_io(future)

    data = await future
    # 可以在await 获取到data 或进行一些数据的处理

    return data


def fake_io(future):
    def sleep():
        global task_run_time

        # 随机休眠 0-1秒
        task_time = random()
        task_run_time += task_time
        time.sleep(task_time)
        # io耗时操作执行完成,模拟系统回调 set_result 方法,给future对象设置随机值
        data = randint(1, 10)
        future.set_result(data)

    Thread(target=sleep).start()


loop = EventLoop()

start_time = time.time()
task_run_time = 0

for _ in range(1000):
    loop.create_task(get_baidu())

loop.add_delay_task(2, loop.stop_exit)
loop.run_forever()
print(f"所有任务执行时间:{task_run_time}, 实际执行时间{time.time() - start_time}")

理解图

请添加图片描述
参考b站大佬 DavyCloud asyncio系列教程

标签:task,协程,python,self,yield,future,result,._,def
来源: https://www.cnblogs.com/tnan/p/16424272.html