RabbitMQ不讲武德,发个消息也这么多花招
作者:互联网
前言
本篇博客已被收录GitHub:https://zhouwenxing.github.io/
文中所涉及的源码也已被收录GitHub:https://github.com/zhouwenxing/lonely-wolf-note (message-queue模块)
使用消息队列必须要保证生产者发送的消息能被消费者所接收,那么生产者如何接收消息呢?下图是 RabbitMQ
的工作模型:
上图中生产者会将消息发送到交换机 Exchange
上,再由 Exchange
发送给不同的 Queue
,而 Queue
是用来存储消息队列,那么假如有多个生产者,那么消息发送到交换机 Exchange
之后,应该如何和 Queue
之间建立绑定关系呢?
如何使用 RabbitMQ 发送消息
RabbitMQ
中提供了3种发送消息的路由方式。
直连 Direct 模式
通过指定一个精确的绑定键来实现 Exchange
(交换机) 和 Queue
(消息队列) 之间的绑定,也就是说,当创建了一个直连类型的交换机时,生产者在发送消息时携带的路由键(routing key),必须与某个绑定键(binding key)完全匹配时,这条消息才会从交换机路由到满足路由关系消息队列上,然后消费者根据各自监听的队列就可以获取到消息(如下如吐所示,Queue1
绑定了 order
,那么这时候发送消息的路由键必须为 order
才能分配到 Queue1
上):
主题 Topic 模式
Direct
模式会存在一定的局限性,有时候我们需要按类型划分,比如订单类路由到一个队列,产品类路由到另一个队列,所以在 RabbitMQ 中,提供了主题模式来实现模糊匹配。使用主题类型连接方式支持两种通配符:
直连方式只能精确匹配,有时候我们需要实现模糊匹配,那么这时候就需要主题类型的连接方式,在 RabbitMQ
中,使用主题类型连接方式支持两种通配符:
-
:表示
0
个或者多个单词 - *:表示
1
个单词
PS:使用通配符时,单词指的是用英文符号的小数点 .
隔开的字符,如:abc.def
就表示有 abc
和 def
两个单词。
下图所示中,因为 Queue1
绑定了 order.#
,所以当发送消息的路由键为 order
或者 order.xxx
时都可以使得消息分配到 Queue1
上:
广播 Fanout 模式
当我们定义了一个广播类型的交换机时就不需要指定绑定键,而且生产者发送消息到交换机上时,也不需要携带路由键,此时当消息到达交换机时,所有与其绑定的队列都会收到消息,这种模式的消息发送适用于消息通知类需求。
如下如所示,Queue1
,Queue2
,Queue3
三个队列都绑定到了一个 Fanout
交换机上,那么当 Fanout Exchange
收到消息时,会同时将消息发送给三个队列:
在 RabbitMQ
提供的后台管理系统中也能查询到创建的交换机和队列等信息,并且可以通过管理后台直接创建队列和交换机:
消息发送实战
下面通过一个 SpringBoot
例子来体会一下三种发送消息的方式。
- 1、
application.yml
文件中添加如下配置:
spring:
rabbitmq:
host: ip
port: 5672
username: admin
password: 123456
- 2、新增一个
RabbitConfig
配置类(为了节省篇幅省略了包名和导入 ),此类中声明了三个交换机和三个队列,并分别进行绑定:
@Configuration
public class RabbitConfig {
//直连交换机
@Bean("directExchange")
public DirectExchange directExchange(){
return new DirectExchange("LONGLY_WOLF_DIRECT_EXCHANGE");
}
//主题交换机
@Bean("topicExchange")
public TopicExchange topicExchange(){
return new TopicExchange("LONGLY_WOLF_TOPIC_EXCHANGE");
}
//广播交换机
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange(){
return new FanoutExchange("LONGLY_WOLF_FANOUT_EXCHANGE");
}
@Bean("orderQueue")
public Queue orderQueue(){
return new Queue("LONGLY_WOLF_ORDER_QUEUE");
}
@Bean("userQueue")
public Queue userQueue(){
return new Queue("LONGLY_WOLF_USER_QUEUE");
}
@Bean("productQueue")
public Queue productQueue(){
return new Queue("LONGLY_WOLF_PRODUCT_QUEUE");
}
//Direct交换机和orderQueue绑定,绑定键为:order.detail
@Bean
public Binding bindDirectExchange(@Qualifier("orderQueue") Queue queue, @Qualifier("directExchange") DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with("order.detail");
}
//Topic交换机和userQueue绑定,绑定键为:user.#
@Bean
public Binding bindTopicExchange(@Qualifier("userQueue") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("user.#");
}
//Fanout交换机和productQueue绑定
@Bean
public Binding bindFanoutExchange(@Qualifier("productQueue") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
}
- 3、新建一个消费者
ExchangeConsumer
类,不同的方法实现分别监听不同的队列:
@Component
public class ExchangeConsumer {
/**
* 监听绑定了direct交换机的的消息队列
*/
@RabbitHandler
@RabbitListener(queues = "LONGLY_WOLF_ORDER_QUEUE")
public void directConsumer(String msg){
System.out.println("direct交换机收到消息:" + msg);
}
/**
* 监听绑定了topic交换机的的消息队列
*/
@RabbitHandler
@RabbitListener(queues = "LONGLY_WOLF_USER_QUEUE")
public void topicConsumer(String msg){
System.out.println("topic交换机收到消息:" + msg);
}
/**
* 监听绑定了fanout交换机的的消息队列
*/
@RabbitHandler
@RabbitListener(queues = "LONGLY_WOLF_PRODUCT_QUEUE")
public void fanoutConsumer(String msg){
System.out.println("fanout交换机收到消息:" + msg);
}
}
- 4、新增一个
RabbitExchangeController
类来作为生产者,进行消息发送:
@RestController
@RequestMapping("/exchange")
public class RabbitExchangeController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping(value="/send/direct")
public String sendDirect(String routingKey,@RequestParam(value = "msg",defaultValue = "no direct message") String msg){
rabbitTemplate.convertAndSend("LONGLY_WOLF_DIRECT_EXCHANGE",routingKey,msg);
return "succ";
}
@GetMapping(value="/send/topic")
public String sendTopic(String routingKey,@RequestParam(value = "msg",defaultValue = "no topic message") String msg){
rabbitTemplate.convertAndSend("LONGLY_WOLF_TOPIC_EXCHANGE",routingKey,msg);
return "succ";
}
@GetMapping(value="/send/fanout")
public String sendFaout(String routingKey,@RequestParam(value = "msg",defaultValue = "no faout message") String msg){
rabbitTemplate.convertAndSend("LONGLY_WOLF_FANOUT_EXCHANGE",routingKey,msg);
return "succ";
}
}
- 5、启动服务,当我们调用第一个接口时候,路由键和绑定键
order.detail
精确匹配时,directConsumer
就会收到消息,同样的,调用第二接口时,路由键满足user.#
时,topicConsumer
就会收到消息,而只要调用第三个接口,不论是否指定路由键,fanoutConsumer
都会收到消息。
消息过期了怎么办
简单的发送消息我们学会了,难道这就能让我们就此止步了吗?显然是不能的,要玩就要玩高级点,所以接下来让我们给消息加点佐料。
标签:队列,绑定,RabbitMQ,讲武,交换机,消息,msg,发个,public 来源: https://www.cnblogs.com/wjxzs/p/14236557.html