其他分享
首页 > 其他分享> > 如何等待另一个线程提交的回调完成?

如何等待另一个线程提交的回调完成?

作者:互联网

我有两个共享状态的Python线程A和B.有一点,A提交了一个由B在其循环上运行的回调,如下所示:

# This line is executed by A
loop.call_soon_threadsafe(callback)

此后,我想继续做其他事情,但是在执行此操作之前,我想确保回调已由B运行.有什么办法(除了标准线程同步原语之外)使A等待回调完成?我知道call_soon_threadsafe返回一个可以取消任务的asyncio.Handle对象,但是我不确定这是否可以用于等待(我对asyncio还是不太了解).

在这种情况下,此回调调用loop.close()并取消剩余的任务,然后在B中,在loop.run_forever()之后有一个loop.close().因此,对于这种用例,尤其是一种线程安全机制,该机制可使我从A知道何时有效地关闭了循环,这对我也同样适用-再次,不涉及互斥锁/条件变量/等.

我知道asyncio并不意味着线程安全,只有很少的例外,但是我想知道是否提供了一种方便的方法来实现这一点.

这是我的意思的一小段内容,以防万一.

import asyncio
import threading
import time

def thread_A():
    print('Thread A')
    loop = asyncio.new_event_loop()
    threading.Thread(target=thread_B, args=(loop,)).start()
    time.sleep(1)
    handle = loop.call_soon_threadsafe(callback, loop)
    # How do I wait for the callback to complete before continuing?
    print('Thread A out')

def thread_B(loop):
    print('Thread B')
    asyncio.set_event_loop(loop)
    loop.run_forever()
    loop.close()
    print('Thread B out')

def callback(loop):
    print('Stopping loop')
    loop.stop()

thread_A()

我尝试使用asyncio.run_coroutine_threadsafe进行此更改,但它不起作用,而是线程A永远挂起.不知道我做错了还是因为我正在停止循环.

import asyncio
import threading
import time

def thread_A():
    global future
    print('Thread A')
    loop = asyncio.new_event_loop()
    threading.Thread(target=thread_B, args=(loop,)).start()
    time.sleep(1)
    future = asyncio.run_coroutine_threadsafe(callback(loop), loop)
    future.result()  # Hangs here
    print('Thread A out')

def thread_B(loop):
    print('Thread B')
    asyncio.set_event_loop(loop)
    loop.run_forever()
    loop.close()
    print('Thread B out')

async def callback(loop):
    print('Stopping loop')
    loop.stop()

thread_A()

解决方法:

设置了回调,并且(大部分)忘记了.它们不适用于您需要从中获得结果的东西.这就是为什么生成的句柄仅允许您取消回调(不再需要此回调)的原因,仅此而已.

如果您需要在另一个线程中等待异步管理的协程的结果,请使用协程并将其作为任务计划为asyncio.run_coroutine_threadsafe();这将为您提供Future() instance,然后您可以等待完成.

但是,使用run_coroutine_threadsafe()停止循环确实需要循环处理比实际运行多一轮的回调.由run_coroutine_threadsafe()返回的Future将不会被通知其计划任务的状态更改.您可以通过在关闭循环之前通过线程B中的loop.run_until_complete()运行asyncio.sleep(0)来解决此问题:

def thread_A():
    # ... 
    # when done, schedule the asyncio loop to exit
    future = asyncio.run_coroutine_threadsafe(shutdown_loop(loop), loop)
    future.result()  # wait for the shutdown to complete
    print("Thread A out")

def thread_B(loop):
    print("Thread B")
    asyncio.set_event_loop(loop)
    loop.run_forever()
    # run one last noop task in the loop to clear remaining callbacks
    loop.run_until_complete(asyncio.sleep(0))
    loop.close()
    print("Thread B out")

async def shutdown_loop(loop):
    print("Stopping loop")
    loop.stop()

当然,这有点怪异,并且取决于回调管理和跨线程任务调度的内部结构是否保持不变.按照默认的asyncio实现立场,运行一个noop任务足以进行几轮回调,从而创建更多要处理的回调,但是替代循环实现可能对此进行不同的处理.

因此,对于关闭循环,使用基于线程的协调可能会更好:

def thread_A():
    # ...
    callback_event = threading.Event()
    loop.call_soon_threadsafe(callback, loop, callback_event)
    callback_event.wait()  # wait for the shutdown to complete
    print("Thread A out")

def thread_B(loop):
    print("Thread B")
    asyncio.set_event_loop(loop)
    loop.run_forever()
    loop.close()
    print("Thread B out")

def callback(loop, callback_event):
    print("Stopping loop")
    loop.stop()
    callback_event.set()

标签:python-asyncio,python
来源: https://codeday.me/bug/20191024/1924376.html