rabbitMQ--死信队列
作者:互联网
基于消费者 reject requeue设置为false 消息进入死信队列
# 应用名称 spring.application.name=rabbitmq # 应用服务 WEB 访问端口 server.port=8080 spring.rabbitmq.host=192.168.1.137 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ #手动ack spring.rabbitmq.listener.simple.acknowledge-mode=manual #每次去交换机拿10条消息 spring.rabbitmq.listener.simple.prefetch=10 #开启confirm机制 spring.rabbitmq.publisher-confirm-type=correlated #开启return机制 spring.rabbitmq.publisher-returns=true
@Configuration public class DeadLetterConfig { public static final String NORMAL_EXCHANGE="normal-exchange"; public static final String NORMAL_QUEUE="normal-queue"; public static final String NORMAL_ROUTING_KEY="normal.#"; public static final String DEAD_EXCHANGE="dead-exchange"; public static final String DEAD_QUEUE="dead-queue"; public static final String DEAD_ROUTING_KEY="dead.#"; @Bean public Exchange normalExchange(){ return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build(); } /*这里的队列需要绑定死信交换机*/ @Bean public Queue normalQueue(){ return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTING_KEY).build(); } @Bean public Binding normalBinding(Exchange normalExchange,Queue normalQueue){ return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs(); } @Bean public Exchange deadExchange(){ return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build(); } @Bean public Queue deadQueue(){ return QueueBuilder.durable(DEAD_QUEUE).build(); } @Bean public Binding deadBinding(Exchange deadExchange,Queue deadQueue){ return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs(); } }
@Component public class DeadListener { @RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE) public void consumer(String msg, Channel channel, Message message) throws IOException { System.out.println("接收到normal队列消息"+msg); /*拒绝消息 消息进入死信队列*/ channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } }
/*死信发送者*/ @Test public void publish() throws IOException { String msg="dead letter"; rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE,"normal.abc",msg); System.in.read(); }
--------
基于消息的生存时间 让消息进入到死信队列
/*死信发送者*/
@Test public void publishExpire() throws IOException { String msg="dead letter Expire"; rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("50000");//5秒钟之内没有将消息消费掉 那么消息将进入死信队列中 return message; } }); System.in.read(); }
最好不要设置 这种方式消息的生存时间 因为rabbitMQ 只会监听最外围的生存时间 也就是说 当msg1 ttl 5s,msg2 ttl 10s
如果msg2第一个到达队列,msg1第二个到达 rabbitMQ会先监听msg2的10s 然后再来查看msg1的5秒 此时msg需要10秒 然后进入死信队列中 不会被正常消费
为了解决上述问题 我们需要引入 延迟交换机来解决此类问题 插件下载地址
https://www.rabbitmq.com/community-plugins.html
下载包 支持3.8.5 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.9/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbitmq:/opt/rabbitmq/plugins
docker exec -it rabbitmq bash
cd /opt/rabbitmq/plugins
cd ../sbin/
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exit
docker restart rabbitmq
这种方式是基于 生产者发送消息 到发送到交换机 消息会堆积在交换机 到达了指定时间 才会路由到指定的队列中 通过这种方式我们可以更方便的实现延迟消费 解决上面的问题
@Test public void DelayedPublish(){ rabbitTemplate.convertAndSend(XDelayedMessageConfig.DelayedMessage_EXCHANGE,"delayed.ach","xxx",new MessagePostProcessor(){ @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(3000); return message; } }); }
@Configuration public class XDelayedMessageConfig { public static final String DelayedMessage_EXCHANGE="DelayedMessage-exchange"; public static final String DelayedMessage_QUEUE="DelayedMessage-queue"; public static final String DelayedMessage_ROUTING_KEY="delayed.#"; /*延迟交换机*/ @Bean public Exchange delayedExchange(){ Map<String, Object> arguments=new HashMap<>(); arguments.put("x-delayed-type", "topic"); Exchange exchange = new CustomExchange(DelayedMessage_EXCHANGE, "x-delayed-message",true,false,arguments); return exchange; } @Bean public Queue delayedQueue(){ return QueueBuilder.durable(DelayedMessage_QUEUE).build(); } @Bean public Binding delayedBinding(Exchange delayedExchange, Queue delayedQueue){ return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DelayedMessage_ROUTING_KEY).noargs(); } }
延迟交换机
------------
标签:return,String,--,spring,rabbitMQ,死信,rabbitmq,message,public 来源: https://www.cnblogs.com/Lcch/p/16493281.html