SpringBoot实现消息可靠传输
作者:互联网
/*普通发送消息*/ @Test void publishWithConfirm() throws IOException { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String s) { if(ack){ System.out.println("消息已经送达到交换机"); }else { System.out.println("消息没有送达到交换机 需要进行重试操作"); } } }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message"); System.out.println("消息发送成功"); System.in.read(); } @Test void publishWithReturn() throws IOException { rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { String msg = new String(message.getBody()); System.out.println("消息:"+msg+"路由队列失败,需要做补救操作"); } }); rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message"); System.out.println("消息发送成功"); System.in.read(); } @Test void publishWithBaseProperties() throws IOException { rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //开启MQ持久化 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; } }); System.out.println("消息发送成功"); System.in.read(); }
# 应用名称 spring.application.name=rabbitmq # 应用服务 WEB 访问端口 server.port=8080 spring.rabbitmq.host=192.168.1.137 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ #手动ack spring.rabbitmq.listener.simple.acknowledge-mode=manual #每次去交换机拿10条消息 spring.rabbitmq.listener.simple.prefetch=10 #开启confirm机制 spring.rabbitmq.publisher-confirm-type=correlated #开启return机制 spring.rabbitmq.publisher-returns=true
@Configuration public class RabbitMQConfig { public static final String EXCHANGE="boot-exchange"; public static final String QUEUE="boot-queue"; public static final String ROUTING_KEY="*.black.*"; @Bean public Exchange bootExchange(){ //channel.DeclareExchange return ExchangeBuilder.topicExchange(EXCHANGE).build(); } @Bean public Queue bootQueue(){ return QueueBuilder.durable(QUEUE).build(); } @Bean public Binding bootBinding(Exchange bootExchange,Queue bootQueue){ return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs(); } }
@Component public class ConsumerListener { @RabbitListener(queues = RabbitMQConfig.QUEUE) //必须先写配置文件 关闭自动提交ack msg 是string类型 也可以方其他对象 例如Person public void consume(String msg, Channel channel, Message message) throws IOException { System.out.println("队列消息为"+msg); String correlationId = message.getMessageProperties().getCorrelationId(); System.out.println("唯一标识为"+correlationId); long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag,false); } }
标签:String,spring,System,rabbitmq,可靠,传输,message,public,SpringBoot 来源: https://www.cnblogs.com/Lcch/p/16491990.html