RabbitMQ(二)
作者:互联网
(1) MQ的概念
Message Queue :消息队列,存放消息的一个容器(先进先出)
(2)MQ的优点
解耦 、 异步提速 、 削峰填谷
(3)MQ的缺点
可用性减弱 、复杂性提供、处理一致性
MQ是需要成本的,适合的地方使用。
(4)MQ的实现
RabbitMQ(erlang AMQP协议)
ActiveMQ(java JMS接口规范)
RocketMQ(java)
Kafka(大数据)
(5)MQ的工作模式
简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式
(6)编写代码
JAVA
ConnectionFactory
host / port / username/ password/ virtualHost
Connection
Channel
Channel.exchangeDeclare(exchangeName,exchangeType[fanout\direct\topic])
Channel.queueDeclare(queueName,true,false,false,null);
Channel.queueBind(queue,exchange,routingKey);
Channel.basicPublish(exchange,routingKey,null,body);
Consumer consumer = new DefaultConsumer(channel){
handleDelivery…
//业务
}
Channel.basicConsume(queueName,true,consumer)
Spring整合
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory id="connectionFactory" host="" port="" username="" password="" virtual-host=""/>
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 队列 -->
<rabbit:queue id="" name=""></rabbit:queue>
<!--交换机 -->
<rabbit:fanout-exchange name="">
<rabbit:bindings>
<rabbit:binding queue=""></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<rabbit:direct-exchange name="">
<rabbit:bindings>
<rabbit:binding key="" queue=""></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:topic-exchange name="">
<rabbit:bindings>
<rabbit:binding pattern="" queue=""></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!--rabbitTemplate.convertAndSend(exchangeName,routingKey,body)-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="监听器类"/>
</rabbit:listener-container>
<!--
方式一
<bean id="" class=""/>
方式二
<context:component-scan base-package="" />
@Component
public class 监听器类 implements MessageListener{
onMessage(Message message){
//业务操作
}
}
-->
</beans>
SpringBoot整合RabbitMQ
(1)创建一个maven项目(jar)
(2)pom.xml添加起步依赖
spring-boot-starter-amqp
(3)编写引导类
(4)application.yml
spring:
rabbitmq:
host: 192.168.222.130
port: 5672
virtual-host: /longmarch
username: 345078978
password: 123456
(5)通过注解来完成交换机、队列、绑定关系的声明?
@Configuration
@Bean
QueueBuilder
ExchangeBuilder
BindingBuilder
package live.longmarch.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @PackageName: live.longmarch.rabbitmq.config
* @ClassName: RabbitMQConfig
* @Author: raven
* @Date: 2020/1/11 20:56
* @Blame: liunian
* @Description:
*/
@Configuration
public class RabbitMQConfig {
/**
* 交换机名称
*/
public static final String ITEM_TOPIC_EXCHANGE = "item_topic_exchange";
/**
* 队列名称
*/
public static final String ITEM_QUEUE = "item_queue";
/**
* // 声明交换机
* @return 返回交换机 设置交换机的名称,设置可以持久化
*/
@Bean("itemTopicExchange")
public Exchange topicExchange(){
return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
}
/**
* 声明队列
* @return 返回队列,设置队列的名称
*/
@Bean("itemQueue")
public Queue itemQueue(){
return QueueBuilder.durable(ITEM_QUEUE).build();
}
/**
* 绑定队列和交换机的关系
* @param queue 队列
* @param exchange 交换机
* @return 返回绑定关系
*/
@Bean
public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue, @Qualifier("itemTopicExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
}
}
(6)发送消息
@Autowired
private RabbitTemplate rabbitTemplate;
rabbitTemplate.convertAndSend(exchangeName,routingKey,body);
package live.longmarch.rabbitmq;
/**
* @PackageName: live.longmarch.rabbitmq
* @ClassName: RabbitMQTest
* @Author: raven
* @Date: 2020/1/11 21:10
* @Blame: liunian
* @Description:
*/
import live.longmarch.rabbitmq.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* 发送消息
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.insert", "商品新增,routing key 为item.insert");
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.update", "商品修改,routing key 为item.update");
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE, "item.delete", "商品删除,routing key 为item.delete");
}
}
(7)接收消息
@RabbitListener
public void method(Message message){
//message.getBody()--->byte[]
}
package live.longmarch.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @PackageName: live.longmarch.rabbitmq.listener
* @ClassName: MyListener
* @Author: raven
* @Date: 2020/1/12 8:56
* @Blame: liunian
* @Description:
*/
@Component
public class MyListener {
@RabbitListener(queues = "item_queue")
public void myListener1(String message){
System.out.println("消费者接收到的消息为:" + message);
}
}
生产者工程:
application.yml文件配置RabbitMQ相关信息;
在生产者工程中编写配置类,用于创建交换机和队列,并进行绑定
注入RabbitTemplate对象,通过RabbitTemplate对象发送消息到交换机
消费者工程:
application.yml文件配置RabbitMQ相关信息
创建消息处理类,用于接收队列中的消息并进行处理
消息的可靠性
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
confirm 确认模式
return 退回模式
rabbitmq 整个消息投递的路径为:
producer —> rabbitmq broker —> exchange —> queue —> consumer
消息从 producer 到 exchange 则会返回一个 confirmCallback 。
让生产者能够知道消息是否发送到了Exchange
消息从 exchange 到 queue 投递失败则会返回一个 returnCallback 。
我们将利用这两个 callback 控制消息的可靠性投递
把交换机发送消息到队列中失败的消息结果告诉生产者
Confirm确认模式
/**
* 确认模式:
* 步骤:
* 1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true"
* 2. 在rabbitTemplate定义ConfirmCallBack回调函数
*/
(1)配置文件中
<rabbit:connection-factory id="connectionFactory"
host="192.168.200.131"
port="5672"
username="guest"
password="guest"
virtual-host="/"
publisher-confirms="true" />
(2)发送消息的时候,通过RabbitTemplate来设置确认回调
@Test
public void testConfirm(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//ack:是否成功
//cause:日志记录
if (ack) {
//接收成功
System.out.println("接收成功消息" + cause);
} else {
//接收失败
System.out.println("接收失败消息" + cause);
//做一些处理,让消息再次发送。
}
}
});
rabbitTemplate.convertAndSend(
"test_exchange_confirm","test_routingKey_confirm3","message confirm...");
}
Return回退模式
/**
* 步骤:
* 1. 开启回退模式:publisher-returns="true"
* 2. 设置ReturnCallBack
* 3. 设置Exchange处理消息的模式:
* 1. 如果消息没有路由到Queue,则丢弃消息(默认)
* 2. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
*/
(1)配置文件中
<rabbit:connection-factory id="connectionFactory"
host="192.168.200.131"
port="5672"
username="guest"
password="guest"
virtual-host="/"
publisher-returns="true" />
(2)发送消息的时候,通过RabbitTemplate来设置回退回调
@Test
public void testReturn(){
//设置交换机处理失败消息的模式
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
/**
* message:消息对象
* replyCode:错误码
* replyText:错误信息
* exchange:交换机
* routingKey:路由
*/
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 执行了...");
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
}
});
rabbitTemplate.convertAndSend(
"test_exchange_confirm","test_routingKey_confirm","message confirm...");
}
对于确认模式:
设置ConnectionFactory的publisher-confirms=“true” 开启 确认模式。
使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
对于退回模式
设置ConnectionFactory的publisher-returns=“true” 开启 退回模式。
使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。
Consumer的ACK确认模式
ack指 Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:
• 自动确认:acknowledge=“none”
• 手动确认:acknowledge=“manual”
• 根据异常情况确认:acknowledge=“auto”,(这种方式使用麻烦,不作讲解)
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
消费者的配置文件中
<rabbit:listener-container connection-factory="connectionFactory"
acknowledge="manual/none">
</rabbit:listener-container>
编写监听器类
@Component
public class AckListener implements ChannelAwareMessageListener{
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println(new String(message.getBody()));
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try{
Thread.sleep(1000);
//处理业务逻辑
//System.out.println("处理出错误了");
//int i = 10/0;
System.out.println("处理业务逻辑");
//channel.basicAck(deliveryTag,true);
}catch (Exception e){
channel.basicNack(deliveryTag,true,true);
}
}
}
在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。
消费的并发处理设置
可以通过MQ中的 listener-container 配置属性
perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
<rabbit:listener-container connection-factory="connectionFactory"
acknowledge="manual" prefetch=“200”>
</rabbit:listener-container>
队列的TTL设置
TTL 全称 Time To Live(存活时间/过期时间)。当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
<rabbit:queue id="xx" name="xx" auto-declare="true" durable="true">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:direct-exchange name="yy">
<rabbit:bindings>
<rabbit:binding key="routingKey" queue="xx"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
rabbitTemplate.convertAndSend("yy","routingKey",body);
直接丢弃消息不合适。
如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
队列过期后,会将队列所有消息全部移除。
消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)
小结
设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
如果两者都进行了设置,以时间短的为准。
死信队列、
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
死信队列:未被正常消费的消息存储的容器
(1)超时未被处理 (2)超出队列长度 (3)消费者处理失败 channel.basicNack() /basicReject()–>false不自动进入队列
延迟队列
问题: 消息的消费在指定的时间之后进行。
实现: TTL(设置消息的过期时间) +DLX(消息进入死信队列)
延迟队列:即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
Rabbitmq实现延迟队列:实现: TTL(设置消息的过期时间) +DLX(消息进入死信队列)
消息补偿
作用: 确保消息一定发送出去并且被正确消费
优化:
在这里插入图片描述
消息幂等性
幂等性:同一个消息被消费多次,最终得到的结果应该是一样。
解决方案: N个相同的消息,只会执行一个消息的内容,其他的消息,想办法把的操作排除掉。
数据库
table
id name price … version【int】
1 xx xxxx 1
N个相同的消息
第一次消息
update table set version = version +1 where id = 1 where version = 1
---->version 2
所有的其他消息
update table set version = version +1 where id = 1 where version = 1
Redis
N个相同的消息
第一个消息:
redis set(key,value);
所有的其他消息
redis get(key)—>结果
获取到了 : 过
获取不到 : 执行业务操作,将key存入Redis
总结:
SpringBoot整合RabbitMQ [掌握]
》创建一个maven项目(jar)
》pom.xml
parent: spring-boot-starter-parent
dependency: spring-boot-starter-amqp
dependency: spring-boot-starter-test
》application.yml
rabbitMQ连接的五要素:
host / port / username/ password / virtualHost
》引导类
生产者
@Configuration @Bean
Queue(QueueBuilder) Exchange(ExchangeBuilder) Binding(BindingBuilder)
@Autowired
private RabbitTemplate rabbitTemplate;
rabbitTemplate.convertAndSend(exchangeName,routingKey,body);
消费者
@RabbitListener(queues="queueName")
public void method(Message message){
//message消息对象 getBody()
}
-------------------------面试(说)-------------------------------
消息投递的可靠性(生产者)
(1) producer----->Exchange
confirm
成功发送到Exchange
未成功发送到Exchange
???
(2) Exchange----->Queue
return
当交换机未正确发送消息到队列的时候
(3) RabbitMQ(broker)是否正常
集群
消息的限流(消费者)
(4) Consumer---->Queue获取到消息
Ack
自动确认
手动确认
channel.basicAck()
channel.basicNack()
(1)消息确认方式一定得是手动确认
(2)prefetch=“200”
TTL
设定消息队列Queue中【消息】的存活时间
死信队列
用来存储处理失败的消息
》超时的(TTL)
》超出队列存储容量(MaxLength)
》消费者通过channel.basicNack()/basicReject()
应用:
消息补偿
确保消息100%能够正确被消费
Producer记录所有发送的消息
Consumer记录所有正确消费的消息
Producer记录所有发送的消息-Consumer记录所有正确消费的消息=未被正确消息的消息(交给消费者重新发送)
Quartz/SpringTask
Redis(Set|sdiff)/Mysql
消息幂等性
同一个消息被消费多次,最终得到的结果一致
第一个消费消息的时候,执行具体的业务,其他的消费都直接过掉[不要再去执行之前的业务]
数据库的乐观锁
Redis的Key [单线程]
-------------------------了解----------------------------
日志监控
消息追踪
集群
标签:rabbitTemplate,exchange,队列,RabbitMQ,消息,message,public 来源: https://blog.csdn.net/weixin_44993313/article/details/103944052