其他分享
首页 > 其他分享> > asyncio-事件循环

asyncio-事件循环

作者:互联网

事件循环event loop 是asyncio的核心,会运行异步任务和回调,执行网络IO操作,运行子进程。
一个事件循环同一时刻只运行一个任务,只是利用了任务的等待时间,当某个任务需要等待某种资源or某种操作完成的时候,event loop转去调度其他task让其运行。

获取事件循环

  1. asyncio.get_running_loop()
    • 返回当前os线程中正在运行的事件循环,如果没有正在运行的事件循环会引发RuntimeError此函数只能由协程或回调来调用。3.7新功能
  2. asyncio.get_event_loop()
    • 获取当前事件循环。如果当前os线程中没有设置当前事件循环,该os线程是主线程,set_event_loop()还没有被调用,则asyncio将创建一个新的事件循环并将其设为当前事件循环。3.10版本后移除,将会成为get_running_loop()的别名。
  3. asyncio.set_event_loop(loop)
    • 将loop设置未当前OS线程的当前事件循环
  4. asyncio.new_event_loop()
    • 创建并返回一个新的事件循环对象

运行和停止循环

  1. loop.run_until_complete(future)

    • 运行直到Future被完成。如果参数是协程对象,将被隐式调度为asyncio.Task来运行。
    • 返回Future的结果或引发相关异常
  2. loop.run_forever()

    • 运行事件循环直到stop()被调用
    • 如果stop()在调用run_forever()之前被调用,循环将轮询一次I/O选择器并设置超时未0,再运行所有已加入计划任务的回调来响应I/O事件,然后退出。
    • 如果stop再run_forever运行期间调用,循环将运行当前批次的回调然后退出。在此情况下由回调加入计划任务的新回调将不会运行;它们将会在下次 run_forever() 或 run_until_complete() 被调用时运行。
  3. loop.stop()

    • 停止事件循环
  4. loop.is_running()

    • 返回 True 如果事件循环当前正在运行
  5. loop.is_closed()

    • 如果事件循环已经被关闭,返回 True 。
  6. loop.close()

    • 关闭事件循环。此函数调用时循环必须除以非运行状态,pending状态的回调将被丢弃。清楚所有的队列并理机关闭执行器,不会等待执行器完成。
    • 这个方法是幂等和不可逆的,调用后不应调用其他方法。

安排回调

  1. loop.call_soon(callback, *args, context=None)
  1. loop.call_soon_threadsafe(callback,*args, context=None)
    • call_soon的线程安全体,必须用于安排其他线程的回调
    • 如果在已关闭的循环上调用则会引发RuntimeError,这可能会在主应用程序被关闭时在二级线程上发生
import asyncio
import time 
def hello_world(loop):
    print(f"callback at {time.strftime('%X')}")
    print("hello world")
    loop.stop()

loop = asyncio.get_event_loop()
loop.call_soon(hello_world, loop)
try:
    loop.run_forever()
finally:
    loop.close()

调度延迟回调

安排调度函数在将来的某个时刻调用的机制,事件循环使用单调时钟来跟踪事件。

  1. loop.call_later(delay, callback, *args, context=None)
    • 在delay秒后调用callback,返回一个 asyncio.TimerHandle 实例,该实例能用于取消回调
    • callback 只被调用一次。如果两个回调被安排在同样的时间点,执行顺序未限定。
    • 可选的位置参数 args 在被调用的时候传递给 callback 。 如果你想把关键字参数传递给 callback ,请使用 functools.partial() 。
    • 可选键值类的参数 context 允许 callback 运行在一个指定的自定义 contextvars.Context 对象中。如果没有提供 context ,则使用当前上下文。
import asyncio
import datetime 

def display_date(end_time, loop):
    print(datetime.datetime.now())
    if (loop.time() + 1)<end_time:
        loop.call_later(1, display_date, end_time, loop)
    else:
        loop.stop()

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
end_time = loop.time()+5
loop.call_soon(display_date, end_time, loop)

try:
    loop.run_forever()
finally:
    loop.close()
  1. loop.call_at(when, callback,*args, context=None)
    • 在给定的绝对时间戳when被调用,使用loop.time同样的时间参考。
    • 其他与call_later方法相同
  2. loop.time()根据时间循环内部的单调时钟,返回当前时间为一个float值。

创建Future和task

  1. loop.create_future()

    • 创建一个附加到事件循环中的asyncio.Future对象在asyncio中创建Futures的首选方式。
  2. loop.create_task(coro, *,name=None):

    • 创建一个task
  3. loop.set_task_factory(factory)

    • 设置一个任务工厂,它将由 loop.create_task() 来使用。
    • 如果 factory 为 None 则将设置默认的任务工厂。 在其他情况下,factory 必须为一个 可调用对象 且签名匹配 (loop, coro),其中 loop 是对活动事件循环的引用,而 coro 是一个协程对象。 该可调用对象必须返回一个兼容 asyncio.Future 的对象。
  4. loop.get_task_factory()

    • 返回一个任务工厂,或者如果是使用默认值则返回 None。

在线程or进程池中执行代码

  1. awaitable loop.run_in_executor(executor, func, *args)
    • 这个方法返回一个 asyncio.Future 对象。
import asyncio
import concurrent.futures

def block_io():
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    return sum(i*i for i in range(10**7))

async def main():
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, block_io)
    print("default thread pool", result)

    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, block_io)
        print("custom thread pool", result)

    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound)
        print("custom process pool", result)

asyncio.run(main())

标签:调用,run,callback,循环,事件,loop,asyncio
来源: https://www.cnblogs.com/baiyutang7/p/16414353.html