如何等待另一个线程提交的回调完成?
作者:互联网
我有两个共享状态的Python线程A和B.有一点,A提交了一个由B在其循环上运行的回调,如下所示:
# This line is executed by A
loop.call_soon_threadsafe(callback)
此后,我想继续做其他事情,但是在执行此操作之前,我想确保回调已由B运行.有什么办法(除了标准线程同步原语之外)使A等待回调完成?我知道call_soon_threadsafe返回一个可以取消任务的asyncio.Handle对象,但是我不确定这是否可以用于等待(我对asyncio还是不太了解).
在这种情况下,此回调调用loop.close()并取消剩余的任务,然后在B中,在loop.run_forever()之后有一个loop.close().因此,对于这种用例,尤其是一种线程安全机制,该机制可使我从A知道何时有效地关闭了循环,这对我也同样适用-再次,不涉及互斥锁/条件变量/等.
我知道asyncio并不意味着线程安全,只有很少的例外,但是我想知道是否提供了一种方便的方法来实现这一点.
这是我的意思的一小段内容,以防万一.
import asyncio
import threading
import time
def thread_A():
print('Thread A')
loop = asyncio.new_event_loop()
threading.Thread(target=thread_B, args=(loop,)).start()
time.sleep(1)
handle = loop.call_soon_threadsafe(callback, loop)
# How do I wait for the callback to complete before continuing?
print('Thread A out')
def thread_B(loop):
print('Thread B')
asyncio.set_event_loop(loop)
loop.run_forever()
loop.close()
print('Thread B out')
def callback(loop):
print('Stopping loop')
loop.stop()
thread_A()
我尝试使用asyncio.run_coroutine_threadsafe
进行此更改,但它不起作用,而是线程A永远挂起.不知道我做错了还是因为我正在停止循环.
import asyncio
import threading
import time
def thread_A():
global future
print('Thread A')
loop = asyncio.new_event_loop()
threading.Thread(target=thread_B, args=(loop,)).start()
time.sleep(1)
future = asyncio.run_coroutine_threadsafe(callback(loop), loop)
future.result() # Hangs here
print('Thread A out')
def thread_B(loop):
print('Thread B')
asyncio.set_event_loop(loop)
loop.run_forever()
loop.close()
print('Thread B out')
async def callback(loop):
print('Stopping loop')
loop.stop()
thread_A()
解决方法:
设置了回调,并且(大部分)忘记了.它们不适用于您需要从中获得结果的东西.这就是为什么生成的句柄仅允许您取消回调(不再需要此回调)的原因,仅此而已.
如果您需要在另一个线程中等待异步管理的协程的结果,请使用协程并将其作为任务计划为asyncio.run_coroutine_threadsafe()
;这将为您提供Future()
instance,然后您可以等待完成.
但是,使用run_coroutine_threadsafe()停止循环确实需要循环处理比实际运行多一轮的回调.由run_coroutine_threadsafe()返回的Future将不会被通知其计划任务的状态更改.您可以通过在关闭循环之前通过线程B中的loop.run_until_complete()运行asyncio.sleep(0)来解决此问题:
def thread_A():
# ...
# when done, schedule the asyncio loop to exit
future = asyncio.run_coroutine_threadsafe(shutdown_loop(loop), loop)
future.result() # wait for the shutdown to complete
print("Thread A out")
def thread_B(loop):
print("Thread B")
asyncio.set_event_loop(loop)
loop.run_forever()
# run one last noop task in the loop to clear remaining callbacks
loop.run_until_complete(asyncio.sleep(0))
loop.close()
print("Thread B out")
async def shutdown_loop(loop):
print("Stopping loop")
loop.stop()
当然,这有点怪异,并且取决于回调管理和跨线程任务调度的内部结构是否保持不变.按照默认的asyncio实现立场,运行一个noop任务足以进行几轮回调,从而创建更多要处理的回调,但是替代循环实现可能对此进行不同的处理.
因此,对于关闭循环,使用基于线程的协调可能会更好:
def thread_A():
# ...
callback_event = threading.Event()
loop.call_soon_threadsafe(callback, loop, callback_event)
callback_event.wait() # wait for the shutdown to complete
print("Thread A out")
def thread_B(loop):
print("Thread B")
asyncio.set_event_loop(loop)
loop.run_forever()
loop.close()
print("Thread B out")
def callback(loop, callback_event):
print("Stopping loop")
loop.stop()
callback_event.set()
标签:python-asyncio,python 来源: https://codeday.me/bug/20191024/1924376.html