RabbitMq学习
作者:互联网
1、概念
amqp:协议
Provider:生产者
Consumer:消费者
Broker:接收和分发消息的应用 RabbitMQ Server
virtual:虚拟机
把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概
念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出
多个vhost,每个用户在自己的vhost创建exchange/queue等
Exchange:交换器
消息交换机,它指定消息按什么规则,路由到哪个队列
常用的交换器:
DirectExchange:交换机在接收到消息后,通过路由键路由到指定的队列
FanoutExchange:交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
TopicExchange:与DirectExchange相似,路由键支持模糊匹配(*,#)
Queue:队列
存放消息的实体
2、配置
spring:
application:
name: rabbit
rabbitmq:
host: local
port: 5672
username: root
password: 123
virtual-host: default
3、消息确认以及通知
RabbitMQ引入发送端消息确认机制,主要通过事务和publisher Confirm机制
3.1、事务
/**
* 配置启用rabbitmq事务
* @param connectionFactory
* @return
*/
@Bean("rabbitTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
@PostConstruct
private void init() {
//启用事务模式,不能开确认回调
//rabbitTemplate.setConfirmCallback(this);
//rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
}
@Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
public void sendIngateQueue(TradePayModelRes msg) {
logger.info("进闸消息已发送 {}",msg.getOutTradeNo());
rabbitTemplate.convertAndSend(exchange,ingateQueue,msg);
}
3.2、Confirm机制
先加上配置,开始confirm机制
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.template.mandatory=true
@Component
public class BaseRabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseRabbitConfig.class);
// @Resource
// private RedisConfig redisConfig;
@Resource
private RedisTemplate<String, String> redisTemplate;
@Resource
private MsgLogMapper msgLogMapper;
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
rabbitTemplate.setConfirmCallback(this);
//rabbitTemplate.setMandatory(true);
// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
rabbitTemplate.setReturnCallback(this);
return rabbitTemplate;
}
//消息发送到Exchange的时候会进行判断
//这里会有一个问题需要注意,就是confirm回productor可能会有延迟,意思就是可以消费者已近就收到了消息,生产者才收到confirm的消息
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
if (ack) {
//成功。。。。。
} else {
//失败。。。。。
}
}
//只有在exchange路由到queue失败的时候才会调用
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
}
}
3.3、消费者的ack机制
acknowledge-mode: manual/none/auto 手动/关闭/自动
manual:
根据我们的实际需求,手动进行ack
none:
不进行ack
auto:
消费者监听到消息后马上进行ack
ack有重试机制,默认的重试次数为3次,可以通过配置进行改动,这里需要注意的事,如果异常被catch 了,没有跑出来的话,重试次数和重试时间间隔的配置将会失效,没有处理好的话会出现死循环。(如果又要catch处理又要指定重试次数,可以使用redis)
spring.rabbitmq.listener.simple.retry.enabled = true //是否开启重试
spring.rabbitmq.listener.simple.retry.max-attempts = 3 //重试次数
spring.rabbitmq.listener.simple.retry.initial-interval = 2000 //每次重试的时间间隔
手动ack(下文的返回队列,意味着会进行重试)
注意:开启ack,必须要对消息进行ack不然消息就会阻塞,后果很严重
//消息消费成功,第一个参数为服务给消息的唯一标志,第二个参数为是否批量处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//消息消费失败,第二个参数为是否重新返回队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
//消息消费失败,第二个参数为是否批量处理,第三个为是否重新返回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
监听方法的定义一般如下
@RabbitListener(queues = {"mail.send.queue"})
public void sendMail(Message message, Channel channel) throws IOException {
//。。。。。
}
标签:rabbitTemplate,重试,ack,rabbitmq,RabbitMq,学习,消息,true 来源: https://blog.csdn.net/weixin_45062785/article/details/117323576