其他分享
首页 > 其他分享> > Spring AMQP v1.4.2 – 网络故障时兔子重新连接问题

Spring AMQP v1.4.2 – 网络故障时兔子重新连接问题

作者:互联网

我正在测试Spring AMQP v1.4.2中的以下场景,并且在网络中断后无法重新连接:

>启动spring应用程序,使用rabbit:listener-container和rabbit:connection-factory(详细配置如下)异步使用消息.
>日志显示应用程序已成功接收消息.
>通过删除Rabbit服务器上的入站网络流量,使RabbitMQ对应用程序不可见:sudo iptables -A INPUT -p tcp –destination-port 5672 -j DROP
>等待至少3分钟(网络连接超时).
>修复连接:sudo iptables -D INPUT -p tcp –destination-port 5672 -j DROP
>等待一段时间(甚至尝试了一个多小时)并且没有重新连接.
>重新启动应用程序,它再次开始接收消息,这意味着网络恢复正常.

我还测试了与VM网络适配器断开连接而不是iptables drop相同的情况,同样的事情发生,即没有自动重新连接.有趣的是,当我尝试iptables REJECT而不是DROP时,它按预期工作,一旦我删除拒绝规则,应用程序就会重新启动,但我认为拒绝更像是服务器故障而不是网络故障.

根据reference document

If a MessageListener fails because of a business exception, the exception is handled by the message listener container and then it goes back to listening for another message. If the failure is caused by a dropped connection (not a business exception), then the consumer that is collecting messages for the listener has to be cancelled and restarted. The SimpleMessageListenerContainer handles this seamlessly, and it leaves a log to say that the listener is being restarted. In fact it loops endlessly trying to restart the consumer, and only if the consumer is very badly behaved indeed will it give up. One side effect is that if the broker is down when the container starts, it will just keep trying until a connection can be established.

这是断开连接后大约一分钟的日志:

    2015-01-16 14:00:42,433 WARN  [SimpleAsyncTaskExecutor-5] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:717) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:707) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:565) ~[amqp-client-3.4.2.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: java.io.EOFException: null
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) ~[na:1.7.0_55]
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) ~[amqp-client-3.4.2.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534) ~[amqp-client-3.4.2.jar:na]
    ... 1 common frames omitted

我在重新连接后几秒钟收到此日志消息:

2015-01-16 14:18:14,551 WARN  [SimpleAsyncTaskExecutor-2] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection timed out

更新:非常奇怪的是,当我在org.springframework.amqp包上启用DEBUG登录时,重新连接成功发生,我再也无法重现这个问题了!

在没有启用调试日志记录的情况下,我尝试调试spring AMQP代码.我观察到删除iptables drop后不久,调用SimpleMessageListenerContainer.doStop()方法,它调用shutdown()并取消所有通道.当我在doStop()上放置断点时,我也得到了这条日志消息,这似乎与原因有关:

2015-01-20 15:28:44,200 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:44,243 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:44,243 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Queue declaration failed; retries left=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) [spring-rabbit-1.4.2.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
2015-01-20 15:28:49,245 ERROR [pool-1-thread-16] org.springframework.amqp.rabbit.connection.CachingConnectionFactory Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'e4288669-2422-40e6-a2ee-b99542509273' in vhost '/', class-id=50, method-id=10)
2015-01-20 15:28:49,283 WARN  [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer Failed to declare queue:e4288669-2422-40e6-a2ee-b99542509273
2015-01-20 15:28:49,300 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:429) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1022) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[e4288669-2422-40e6-a2ee-b99542509273]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:486) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:401) ~[spring-rabbit-1.4.2.RELEASE.jar:na]
    ... 2 common frames omitted
2015-01-20 15:28:49,301 ERROR [SimpleAsyncTaskExecutor-3] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer Stopping container from aborted consumer

更新2:在将请求心跳设置为30秒之后,如答案所示,重新连接大部分时间都有效,并成功重新定义了绑定到扇出交换的独占临时队列,但它仍然无法偶尔重新连接.

在失败的极少数情况下,我在测试期间监视RabbitMQ管理控制台并观察到建立了新连接(在超时删除旧连接之后)但重新连接后未重新定义独占临时队列.客户端也没有收到任何消息.现在很难可靠地重现这个问题,因为它不常发生.我已经提供了下面的完整配置,现在包含队列声明.

更新3:即使用自动删除命名队列替换独占临时队列,偶尔也会出现相同的行为;即,重新连接后不重新定义自动删除命名队列,并且在重新启动应用程序之前不接收任何消息.

如果有人可以帮助我,我会非常感激.

这是我依赖的Spring AMQP配置:

<!-- Create a temporary exclusive queue to subscribe to the control exchange -->
<rabbit:queue id="control-queue"/>

<!-- Bind the temporary queue to the control exchange -->
<rabbit:fanout-exchange name="control">
    <rabbit:bindings>
        <rabbit:binding queue="control-queue"/>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<!-- Subscribe to the temporary queue -->
<rabbit:listener-container connection-factory="connection-factory"
                           acknowledge="none"
                           concurrency="1"
                           prefetch="1">
    <rabbit:listener queues="control-queue" ref="controlQueueConsumer"/>

</rabbit:listener-container>

<rabbit:connection-factory id="connection-factory"
                           username="${rabbit.username}"
                           password="${rabbit.password}"
                           host="${rabbit.host}"
                           virtual-host="${rabbit.virtualhost}"
                           publisher-confirms="true" 
                           channel-cache-size="100"
                           requested-heartbeat="30" />

<rabbit:admin id="admin" connection-factory="connection-factory"/>

<rabbit:queue id="qu0-id" name="qu0">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="dead-letter"/>
    </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:topic-exchange id="default-exchange" name="default-ex" declared-by="admin">
    <rabbit:bindings>
        <rabbit:binding queue="qu0" pattern="p.0"/>
    </rabbit:bindings>
</rabbit:topic-exchange>

<rabbit:listener-container connection-factory="connection-factory"
                           acknowledge="manual"
                           concurrency="4"
                           prefetch="30">
    <rabbit:listener queues="qu0" ref="queueConsumerComponent"/>
</rabbit:listener-container>

解决方法:

我刚刚按照描述运行你的测试(使用iptables丢弃数据包的linux上的兔子).

重新建立连接时没有日志(也许我们应该).

我建议你打开调试日志以查看重新连接.

编辑:

来自rabbitmq文档:

exclusive
Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.

从你的例外:

reply-code=405, reply-text=RESOURCE_LOCKED – cannot obtain exclusive access to locked queue ‘e4288669-2422-40e6-a2ee-b99542509273’ in vhost ‘/’, class-id=50, method-

所以问题是经纪人仍然认为存在其他连接.

>不要使用独占队列(无论如何,您将丢失具有此类队列的消息).要么,
>设置低的requestedHeartbeat,以便代理更快地检测丢失的连接.

标签:spring-amqp,spring
来源: https://codeday.me/bug/20191007/1867805.html