Spring Boot AMQP接收器
作者:互联网
我正在创建一个简单的Spring Boot应用程序,我希望接收发送到AMQP(Rabbit)交换机(来自其他应用程序)的消息.
我收到一个错误,指出我要接收的队列不存在.当我看到http://localhost:15672/#/queues时,我可以看到它确实存在;我在开始这个应用程序之前删除它.
我在另一篇文章中读到,如果队列不存在,则必须声明RabbitAdmin,并使用declareExchange()和declareQueue()来克服这个问题.
即使有了这个,我仍然会看到同样的错误.此外,该错误使b / c混淆:
引起:com.rabbitmq.client.ShutdownSignalException:通道错误;协议方法:#method(reply-code = 404,reply-text = NOT_FOUND – vhost’/’中没有队列’from-logger-exchange’,class-id = 50,method-id = 10)
请注意,“无队列”表示我的交换名称.
这是配置代码(我意识到Boot创建了connectionFactory& RabbitAdmin,但是当它不起作用时开始添加它们):
'import javax.annotation.PostConstruct;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class Config {
@Value("${spring.activemq.broker-url}")
String host;
@Value("${spring.activemq.user}")
String userName;
@Value("${spring.activemq.password}")
String password;
@Value("${config.exchangeName}")
String exchangeName;
@Value("${config.queueName}")
String queueName;
@PostConstruct
public void printVariables() {
System.out.println("host " + host);
System.out.println("userName " + userName);
System.out.println("password " + password);
System.out.println("exchangeName " + exchangeName);
System.out.println("queueName " + queueName);
System.out.println("queue.name " + queue().getName());
System.out.println("exchange.name " + topicExchange().getName());
System.out.println("binding " + binding().toString());
System.out.println("connectionFactory " + connectionFactory().toString());
System.out.println("rabbitAdmin " + rabbitAdmin().toString());
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
rabbitAdmin.declareExchange(topicExchange());
rabbitAdmin.declareQueue(queue());
return rabbitAdmin;
}
@Bean
public Queue queue() {
return new Queue(queueName);
}
// @Bean
// public FanoutExchange fanoutExchange() {
// return new FanoutExchange(exchangeName);
// }
public TopicExchange topicExchange() {
return new TopicExchange(exchangeName, false, true);
}
@Bean
public Binding binding() {
//return BindingBuilder.bind(queue()).to(fanoutExchange());
return BindingBuilder.bind(queue()).to(topicExchange()).with("my.routing.key");
}
@Bean
public Receiver receiver() {
return new Receiver();
}
}
“
这是接收者:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
public class Receiver {
@RabbitListener(queues="${config.exchangeName}")
public void receive(String input) {
System.out.println(" Receiver#receive input: " + input);
}
}
这是控制台输出的第一位:
. ____ _ __
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v1.3.2.RELEASE)
2016-02-23 10:20:47.710 INFO 18804 --- [ main] c.i.logging.receiver.Application : Starting Application on INT002520 with PID 18804 (C:\Users\matt.aspen\Documents\_logging\log-message-receiver2\target\classes started by matt.aspen in C:\Users\matt.aspen\Documents\_logging\log-message-receiver2)
2016-02-23 10:20:47.710 INFO 18804 --- [ main] c.i.logging.receiver.Application : No active profile set, falling back to default profiles: default
2016-02-23 10:20:47.710 DEBUG 18804 --- [ main] o.s.boot.SpringApplication : Loading source class com.intelligrated.logging.receiver.Application
2016-02-23 10:20:47.750 DEBUG 18804 --- [ main] o.s.b.c.c.ConfigFileApplicationListener : Loaded config file 'classpath:/application.properties'
2016-02-23 10:20:47.750 DEBUG 18804 --- [ main] o.s.b.c.c.ConfigFileApplicationListener : Skipped (empty) config file 'classpath:/application.properties' for profile default
2016-02-23 10:20:47.750 INFO 18804 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@679b62af: startup date [Tue Feb 23 10:20:47 PST 2016]; root of context hierarchy
2016-02-23 10:20:48.482 INFO 18804 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type [class org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration$$EnhancerBySpringCGLIB$$68858653] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
host localhost
userName guest
password guest
exchangeName from-logger-exchange
queueName from-logger-queue
queue.name from-logger-queue
exchange.name from-logger-exchange
binding Binding [destination=from-logger-queue, exchange=from-logger-exchange, routingKey=my.routing.key]
connectionFactory CachingConnectionFactory [channelCacheSize=1, host=localhost, port=5672, active=true connectionFactory]
2016-02-23 10:20:48.642 INFO 18804 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: SimpleConnection@6239aba6 [delegate=amqp://guest@127.0.0.1:5672/]
rabbitAdmin org.springframework.amqp.rabbit.core.RabbitAdmin@5f354bcf
2016-02-23 10:20:48.753 DEBUG 18804 --- [ main] inMXBeanRegistrar$SpringApplicationAdmin : Application Admin MBean registered with name 'org.springframework.boot:type=Admin,name=SpringApplication'
2016-02-23 10:20:48.998 INFO 18804 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2016-02-23 10:20:49.208 INFO 18804 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase -2147482648
2016-02-23 10:20:49.208 INFO 18804 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647
2016-02-23 10:20:49.218 WARN 18804 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue:from-logger-exchange
2016-02-23 10:20:49.228 WARN 18804 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[from-logger-exchange]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:571) ~[spring-rabbit-1.5.3.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:470) ~[spring-rabbit-1.5.3.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1171) [spring-rabbit-1.5.3.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_40]
Caused by: java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) ~[amqp-client-3.5.7.jar:na]
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) ~[amqp-client-3.5.7.jar:na]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) ~[amqp-client-3.5.7.jar:na]
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:885) ~[amqp-client-3.5.7.jar:na]
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61) ~[amqp-client-3.5.7.jar:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_40]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_40]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_40]
at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_40]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:764) ~[spring-rabbit-1.5.3.RELEASE.jar:na]
at com.sun.proxy.$Proxy31.queueDeclarePassive(Unknown Source) ~[na:na]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:550) ~[spring-rabbit-1.5.3.RELEASE.jar:na]
... 3 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'from-logger-exchange' in vhost '/', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) ~[amqp-client-3.5.7.jar:na]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) ~[amqp-client-3.5.7.jar:na]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361) ~[amqp-client-3.5.7.jar:na]
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226) ~[amqp-client-3.5.7.jar:na]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ~[amqp-client-3.5.7.jar:na]
... 12 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'from-logger-exchange' in vhost '/', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484) ~[amqp-client-3.5.7.jar:na]
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321) ~[amqp-client-3.5.7.jar:na]
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) ~[amqp-client-3.5.7.jar:na]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) ~[amqp-client-3.5.7.jar:na]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554) ~[amqp-client-3.5.7.jar:na]
... 1 common frames omitted
2016-02-23 10:20:54.226 WARN 18804 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue:from-logger-exchange
2016-02-23 10:20:54.226 WARN 18804 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Queue declaration failed; retries left=2
更令人困惑的是,当我看到http://localhost:15672/#/connections时,我看到连接,http://localhost:15672/#/channels我看到频道,http://localhost:15672/#/exchanges我看到“来自记录器 – 交换”,而http://localhost:15672/#/queues我看到“来自记录器 – 队列”
解决方法:
一个问题是你的交换不是@Bean.
我明白了
reply-code=404, reply-text=NOT_FOUND - no exchange 'from.logging.exchange' in vhost '/', class-id=50, method-id=20)
用你的配置.将@Bean添加到交换机修复它.
编辑
另外(正如Artem在下面指出的那样),你在监听器上有错误的属性
@RabbitListener(queues="${config.exchangeName}")
应该
@RabbitListener(queues="${config.queueName}")
标签:spring,spring-boot-2,spring-amqp 来源: https://codeday.me/bug/20190711/1431830.html