python-如果事件循环已经在运行,如何等待协程在方法中同步完成?
作者:互联网
我正在尝试创建一个基于Python的CLI,该CLI通过websockets与Web服务进行通信.我遇到的一个问题是CLI间歇性地向Web服务发出的请求无法得到处理.查看来自Web服务的日志,我可以看到问题是由以下事实引起的:套接字关闭的同时(甚至在关闭之后)经常发出这些请求:
2016-09-13 13:28:10,930 [22 ] INFO DeviceBridge - Device bridge has opened
2016-09-13 13:28:11,936 [21 ] DEBUG DeviceBridge - Device bridge has received message
2016-09-13 13:28:11,937 [21 ] DEBUG DeviceBridge - Device bridge has received valid message
2016-09-13 13:28:11,937 [21 ] WARN DeviceBridge - Unable to process request: {"value": false, "path": "testcube.pwms[0].enabled", "op": "replace"}
2016-09-13 13:28:11,936 [5 ] DEBUG DeviceBridge - Device bridge has closed
在我的CLI中,我定义了一个CommunicationService类,该类负责处理与Web服务的所有直接通信.在内部,它使用websockets
程序包来处理通信,该程序包本身是建立在asyncio之上的.
CommunicationService包含以下发送请求的方法:
def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
asyncio.ensure_future(self._ws.send(request))
…其中ws是先前通过另一种方法打开的websocket:
self._ws = await websockets.connect(websocket_address)
我想要的是能够等待asyncio.ensure_future返回的未来,并且如果有必要的话,可以在之后等待一小段时间,以便在关闭websocket之前为Web服务腾出时间来处理请求.
但是,由于send_request是同步方法,因此不能简单地等待这些将来.使它异步将是没有意义的,因为没有什么可以等待它返回的协程对象.我也不能使用loop.run_until_complete,因为循环在调用时已经在运行.
我发现有人描述了一个非常类似于我在mail.python.org处的问题.在该线程中发布的解决方案是,在循环已经运行的情况下,使该函数返回协程对象:
def aio_map(coro, iterable, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
coroutines = map(coro, iterable)
coros = asyncio.gather(*coroutines, return_exceptions=True, loop=loop)
if loop.is_running():
return coros
else:
return loop.run_until_complete(coros)
这对我来说是不可能的,因为我正在使用PyRx(反应性框架的Python实现),并且send_request仅作为可观察到的Rx的订户调用,这意味着返回值将被丢弃并且对我的代码不可用:
class AnonymousObserver(ObserverBase):
...
def _on_next_core(self, value):
self._next(value)
附带说明一下,我不确定这是否是asyncio经常遇到的某种问题,或者我是否没有得到它,但是我发现使用它非常令人沮丧.在C#中(例如),我所需要做的可能只是以下内容:
void SendRequest(string request)
{
this.ws.Send(request).Wait();
// Task.Delay(500).Wait(); // Uncomment If necessary
}
同时,asyncio的“ wait”版本只会返回另一个我被迫丢弃的协程.
更新资料
我找到了解决该问题的方法,该方法似乎可行.我有一个异步回调,该回调在命令执行后且CLI终止之前被执行,因此我从此进行了更改…
async def after_command():
await comms.stop()
…对此:
async def after_command():
await asyncio.sleep(0.25) # Allow time for communication
await comms.stop()
不过,我仍然很高兴收到有关此问题的任何答案,以供将来参考.在其他情况下,我可能无法依靠这种变通方法,并且我仍然认为,最好在send_request内部执行延迟,这样CommunicationService的客户端不必担心计时问题,这是更好的做法.
关于文森特的问题:
Does your loop run in a different thread, or is send_request called by some callback?
一切都在同一线程中运行-由回调调用.发生的是,我定义了所有命令以使用异步回调,并且在执行其中的某些命令后,它们将尝试向Web服务发送请求.由于它们是异步的,因此它们只有在通过CLI顶层的loop.run_until_complete调用执行后才执行此操作-这意味着循环将在执行和执行过程中途运行.发出此请求(通过间接调用send_request).
更新2
这是基于Vincent建议添加“完成”回调的解决方案.
将一个新的布尔字段_busy添加到CommunicationService,以表示是否正在进行通讯活动.
在发送请求之前,对CommunicationService.send_request进行了修改,以将_busy设置为true,然后为_ws.send提供回调以在完成后重置_busy:
def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
def callback(_):
self._busy = False
self._busy = True
asyncio.ensure_future(self._ws.send(request)).add_done_callback(callback)
现在实现了CommunicationService.stop,以等待在继续进行操作之前将该标志设置为false:
async def stop(self) -> None:
"""
Terminate communications with TestCube Web Service.
"""
if self._listen_task is None or self._ws is None:
return
# Wait for comms activity to stop.
while self._busy:
await asyncio.sleep(0.1)
# Allow short delay after final request is processed.
await asyncio.sleep(0.1)
self._listen_task.cancel()
await asyncio.wait([self._listen_task, self._ws.close()])
self._listen_task = None
self._ws = None
logger.info('Terminated connection to TestCube Web Service')
这似乎也可行,并且至少以这种方式,所有通信时序逻辑都应按原样封装在CommunicationService类中.
更新3
基于Vincent提议的更好的解决方案.
而不是self._busy,我们有self._send_request_tasks = [].
新的send_request实现:
def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
task = asyncio.ensure_future(self._ws.send(request))
self._send_request_tasks.append(task)
新的停止实施:
async def stop(self) -> None:
if self._listen_task is None or self._ws is None:
return
# Wait for comms activity to stop.
if self._send_request_tasks:
await asyncio.wait(self._send_request_tasks)
...
解决方法:
您可以使用一组任务:
self._send_request_tasks = set()
使用sure_future计划任务,并使用add_done_callback进行清理:
def send_request(self, request: str) -> None:
task = asyncio.ensure_future(self._ws.send(request))
self._send_request_tasks.add(task)
task.add_done_callback(self._send_request_tasks.remove)
并等待一组任务完成:
async def stop(self):
if self._send_request_tasks:
await asyncio.wait(self._send_request_tasks)
标签:python-3-x,python-asyncio,python 来源: https://codeday.me/bug/20191118/2026213.html