其他分享
首页 > 其他分享> > RabbitMq学习

RabbitMq学习

作者:互联网

1、概念

amqp:协议
Provider:生产者
Consumer:消费者
Broker:接收和分发消息的应用 RabbitMQ Server
virtual:虚拟机
把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概
念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出
多个vhost,每个用户在自己的vhost创建exchange/queue等
Exchange:交换器
消息交换机,它指定消息按什么规则,路由到哪个队列
常用的交换器:
DirectExchange:交换机在接收到消息后,通过路由键路由到指定的队列
FanoutExchange:交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
TopicExchange:与DirectExchange相似,路由键支持模糊匹配(*,#)
Queue:队列
存放消息的实体

2、配置

spring:
  application:
    name: rabbit
  rabbitmq:
    host: local
    port: 5672
    username: root
    password: 123
    virtual-host: default

3、消息确认以及通知

RabbitMQ引入发送端消息确认机制,主要通过事务和publisher Confirm机制

3.1、事务

/**
 * 配置启用rabbitmq事务
 * @param connectionFactory
 * @return
 */
@Bean("rabbitTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
    return new RabbitTransactionManager(connectionFactory);
}
 
	@PostConstruct
private void init() {
    //启用事务模式,不能开确认回调
	//rabbitTemplate.setConfirmCallback(this);
   //rabbitTemplate.setReturnCallback(this);
	rabbitTemplate.setChannelTransacted(true);
	rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
}

@Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
    public void sendIngateQueue(TradePayModelRes msg) {
	logger.info("进闸消息已发送 {}",msg.getOutTradeNo());
	rabbitTemplate.convertAndSend(exchange,ingateQueue,msg);
}

3.2、Confirm机制

先加上配置,开始confirm机制

spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.template.mandatory=true
@Component
public class BaseRabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    private static final Logger LOGGER = LoggerFactory.getLogger(BaseRabbitConfig.class);

//    @Resource
//    private RedisConfig redisConfig;

    @Resource
    private RedisTemplate<String, String> redisTemplate;

    @Resource
    private MsgLogMapper msgLogMapper;

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
        rabbitTemplate.setConfirmCallback(this);
        //rabbitTemplate.setMandatory(true);
        // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
        rabbitTemplate.setReturnCallback(this);
        return rabbitTemplate;
    }

    //消息发送到Exchange的时候会进行判断
    //这里会有一个问题需要注意,就是confirm回productor可能会有延迟,意思就是可以消费者已近就收到了消息,生产者才收到confirm的消息
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
            if (ack) {
                //成功。。。。。
            } else {
                //失败。。。。。
            }
    }
    
    //只有在exchange路由到queue失败的时候才会调用
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

    }
}

3.3、消费者的ack机制

acknowledge-mode: manual/none/auto 手动/关闭/自动
manual:
根据我们的实际需求,手动进行ack
none:
不进行ack
auto:
消费者监听到消息后马上进行ack

ack有重试机制,默认的重试次数为3次,可以通过配置进行改动,这里需要注意的事,如果异常被catch 了,没有跑出来的话,重试次数和重试时间间隔的配置将会失效,没有处理好的话会出现死循环。(如果又要catch处理又要指定重试次数,可以使用redis)

spring.rabbitmq.listener.simple.retry.enabled = true //是否开启重试
spring.rabbitmq.listener.simple.retry.max-attempts = 3 //重试次数
spring.rabbitmq.listener.simple.retry.initial-interval = 2000 //每次重试的时间间隔

手动ack(下文的返回队列,意味着会进行重试)
注意:开启ack,必须要对消息进行ack不然消息就会阻塞,后果很严重

//消息消费成功,第一个参数为服务给消息的唯一标志,第二个参数为是否批量处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//消息消费失败,第二个参数为是否重新返回队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
//消息消费失败,第二个参数为是否批量处理,第三个为是否重新返回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

监听方法的定义一般如下

@RabbitListener(queues = {"mail.send.queue"})
public void sendMail(Message message, Channel channel) throws IOException {
    //。。。。。
    }

标签:rabbitTemplate,重试,ack,rabbitmq,RabbitMq,学习,消息,true
来源: https://blog.csdn.net/weixin_45062785/article/details/117323576