Spring Boot中集成 RabbitMQ
作者:互联网
- SpringBoot 集成 RabbitMQ
<!-- 在 pom 文件中添加 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置
#修改 application.properites文件
# 配置集群的VIP # 192.168.174.150:5672
spring.rabbitmq.addresses=192.168.174.150:5672
# 配置真实IP也可以
#spring.rabbitmq.addresses=192.168.174.140:5672,192.168.174.141:5672
# 配置账号
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.connection-timeout=15000
@Configuration
public class RabbitConfig {
private Logger log = LoggerFactory.getLogger(RabbitConfig.class);
// 这里配置,配置文件就不生效了
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
// 生产者:confirm 模式 , 默认是 false
connectionFactory.setPublisherConfirms(true);
// 生产者:return机制 , 默认是 false
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 生产者: 与return机制结合配置次属性
rabbitTemplate.setMandatory(true);
// 发送确认消息,在生产者发送消息后,需要对rabbitTemplate设置ConfirmCallback对象
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("消息回调id:" + correlationData);
if (ack) {
log.info("消息成功消费");
} else {
log.info("消息消费失败:" + cause);
}
}
// 启动消息失败返回,比如路由不到队列时触发回调
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
return rabbitTemplate;
}
}
-
自动确认消息
@Component public class HelloProductor { // 生产者 @Autowired private RabbitTemplate rabbitTemplate; // 简单对列的情况下routingKey 即为Queue名 public void sendMsg(String content) { rabbitTemplate.convertAndSend("q_hello", context); } }
@Component @RabbitListener(queues = "q_hello") // 队列名 public class HelloReceiver { // 消费者 @RabbitHandler public void process(String hello) { System.out.println("Receiver : " + hello); } }
-
# 消费者:手动应答 , 默认是自动应答 spring.rabbitmq.listener.simple.acknowledge-mode=manual subject.exchange=subject.exchange subject.key=subject.key subject.queue=subject.queue
@Component public class HelloProductor2 { // 生产者 @Autowired private RabbitTemplate rabbitTemplate; @Value("${subject.exchange}") private String subjectAwardExchange; @Value("${subject.key}") private String subjectAwardKey; public void sendMsg(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(subjectExchange,subjectKey, content, correlationId); // 简单对列的情况下routingKey即为Q名 // rabbitTemplate.convertAndSend("q_hello", context); } }
@RabbitListener( bindings=@QueueBinding( value=@Queue(value="${subject.queue}",autoDelete="false"), exchange=@Exchange(value="${subject.exchange}",type=ExchangeTypes.DIRECT), key="${subject.key}" ) ) public class HelloReceiver2 { // 消费者 @RabbitHandler public void processMessage2(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { System.out.println(message); try { channel.basicAck(tag,false); // 手动确认消息 } catch (IOException e) { e.printStackTrace(); } } }
-
@Configuration public class RabbitConfig { public static final String DELAY_QUEUE = "delay.queue"; // 延迟队列TTL名称 // DLX,死信发送到的交换机 public static final String DELAY_EXCHANGE = "delay.exchange"; public static final String DELAY_ROUTING_KEY = "delay.exchange"; // 路由名称 public static final String QUEUE_NAME = "queue_name"; public static final String EXCHANGE_NAME = "exchange_name"; public static final String ROUTING_KEY = "exchange_name"; @Bean public Queue delayQueue() { Map<String, Object> params = new HashMap<>(); // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称, params.put("x-dead-letter-exchange", EXCHANGE_NAME); // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。 params.put("x-dead-letter-routing-key", ROUTING_KEY); return new Queue(DELAY_QUEUE, true, false, false, params); } @Bean public DirectExchange delayExchange() { return new DirectExchange(DELAY_EXCHANGE); } @Bean public Binding delayBinding() { return BindingBuilder.bind(delayQueue()) .to(delayExchange()) .with(DELAY_ROUTING_KEY); } @Bean public Queue registerQueue() { return new Queue(QUEUE_NAME, true); } @Bean public DirectExchange registerExchange() { return new DirectExchange(EXCHANGE_NAME); } @Bean public Binding registerBinding() { return BindingBuilder.bind(registerQueue()) .to(registerExchange()) .with(ROUTING_KEY); } }
@Component public class HelloProductor3 { // 生产者 @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(String msg) { int delayTime = 60000; // 1分钟延时 : 60 * 1000 rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE, RabbitConfig.DELAY_ROUTING_KEY, msg, message -> { log.info("发送订单创建消息:{}", msg); message.getMessageProperties().setExpiration(String.valueOf(delayTime)); return message; }); } }
@Component @RabbitListener(queues = {RabbitConfig.QUEUE_NAME}) public class HelloReceiver3 { // 消费者 @RabbitHandler public void listenerQueue(String msg, Message message, Channel channel) throws IOException { System.out.println(MessageFormat.format("[监听的消息] - [消费时间]: [{0}] - [{1}]", LocalDateTime.now(), msg)); try{ // TODO 通知MQ 已经被消费完成 可以ACK了 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { //TODO 处理失败,重新压入MQ channel.basicRecover(); e.printStackTrace(); } } }
标签:rabbitTemplate,return,String,exchange,Spring,Boot,RabbitMQ,public,subject 来源: https://blog.csdn.net/justfamily/article/details/104050907