其他分享
首页 > 其他分享> > SpringBoot实现消息可靠传输

SpringBoot实现消息可靠传输

作者:互联网

 

   /*普通发送消息*/
    @Test
    void publishWithConfirm() throws IOException {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String s) {
                if(ack){
                    System.out.println("消息已经送达到交换机");
                }else {
                    System.out.println("消息没有送达到交换机 需要进行重试操作");
                }
            }
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
        System.out.println("消息发送成功");
        System.in.read();
    }

    @Test
    void publishWithReturn() throws IOException {
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int i, String s, String s1, String s2) {

                String msg = new String(message.getBody());
                System.out.println("消息:"+msg+"路由队列失败,需要做补救操作");

            }
        });
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,"big.black.dog","message");
        System.out.println("消息发送成功");
        System.in.read();
    }

    @Test
    void publishWithBaseProperties() throws IOException {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, "big.black.dog", "message", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //开启MQ持久化
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                return message;
            }
        });
        System.out.println("消息发送成功");
        System.in.read();
    }
# 应用名称
spring.application.name=rabbitmq
# 应用服务 WEB 访问端口
server.port=8080
spring.rabbitmq.host=192.168.1.137
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#手动ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#每次去交换机拿10条消息
spring.rabbitmq.listener.simple.prefetch=10
#开启confirm机制
spring.rabbitmq.publisher-confirm-type=correlated
#开启return机制
spring.rabbitmq.publisher-returns=true

 

@Configuration
public class RabbitMQConfig {
    public static final String EXCHANGE="boot-exchange";
    public static final String QUEUE="boot-queue";
    public static final String ROUTING_KEY="*.black.*";
    @Bean
    public Exchange bootExchange(){
        //channel.DeclareExchange
       return ExchangeBuilder.topicExchange(EXCHANGE).build();
    }
    @Bean
    public Queue bootQueue(){
        return QueueBuilder.durable(QUEUE).build();
    }
    @Bean
    public Binding bootBinding(Exchange bootExchange,Queue bootQueue){
        return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();
    }
}
@Component
public class ConsumerListener {

    @RabbitListener(queues = RabbitMQConfig.QUEUE)
    //必须先写配置文件 关闭自动提交ack  msg 是string类型 也可以方其他对象  例如Person
    public void consume(String msg, Channel channel, Message message) throws IOException {
        System.out.println("队列消息为"+msg);
        String correlationId = message.getMessageProperties().getCorrelationId();
        System.out.println("唯一标识为"+correlationId);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag,false);
    }
}

 


 

标签:String,spring,System,rabbitmq,可靠,传输,message,public,SpringBoot
来源: https://www.cnblogs.com/Lcch/p/16491990.html