其他分享
首页 > 其他分享> > 消息中间件RabbitMQ(四)——消息收发方式

消息中间件RabbitMQ(四)——消息收发方式

作者:互联网

文章目录

1. RabbitMQ 工作原理

在这里插入图片描述

解释说明:

  1. 生产者(Producer):发布消息到 RabbitMQ中的交换机(Exchange)上
  2. 交换机(Exchange):和生产者建立连接并接收生产者的消息,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
  3. 队列(Queue):Exchange将消息分发到指定的 QueueQueue和消费者进行交互。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据
  4. 路由(Routes):交换机转发消息到队列的规则
  5. 消费者(Consumer):监听 RabbitMQ中的 Queue中的消息

注意:生产者,消费者和消息中间件很多时候并不在同一机器上;同一个应用程序既可以是生产者又是可以是消费者。

2. RabbitMQ 七种消息收发方式

RabbitMQ官网介绍了如下七种消息分发的形式,下面逐一代码实现。代码可在文章最后查看。

2.1 代码环境

SpringBoot : 2.5.7RabbitMQ:3.9.13

application.properties 中配置 RabbitMQ的基本连接信息,如下:

server.port=8888
spring.rabbitmq.host=192.168.3.157
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

RabbitMQ中,所有的消息生产者提交的消息都会交由 Exchange进行再分配,Exchange会根据不同的策略将消息分发到不同的 Queue中。

下面所有的生产者和消费者都在同一个工程中,方便测试

2.2 消息收发

2.2.1 Hello World

消息传播图如下:

在这里插入图片描述

这种消息分发采用的是默认的Exchange,在RabbitMQWeb客户端中,可以看到RabbitMQ提供的交换机

在这里插入图片描述

定义队列:

@Configuration
public class RabbitMQConfig {

    // 队列的名称
    public static final String SCORPIOS_QUEUE_NAME = "scorpios_queue_name";

    /**
     * 第一个参数是消息队列名字
     * 第二个参数表示消息是否持久化
     * 第三个参数表示消息队列是否排他,一般都是设置为 false,即不排他
     * 第四个参数表示如果该队列没有任何订阅的消费者的话,该队列会被自动删除,一般适用于临时队列
     * @return
     */
    @Bean
    Queue queue() {
        return new Queue(RabbitMQConfig.SCORPIOS_QUEUE_NAME,true,false,false);
    }

}

消费者定义:

@Slf4j
@Component
public class ScorpiosConsumer {

    @RabbitListener(queues = RabbitMQConfig.SCORPIOS_QUEUE_NAME)
    public void consume(String msg) {
        log.info("消费者收到的消息为:{}",msg);
    }
}

消息发送:模拟发送请求来向RabbiMQ发送消息

@RestController
public class RabbitMQController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/send/message")
    public String test(){
        log.info("接收到客户端消息,向RabbitMQ发送消息...");
        rabbitTemplate.convertAndSend(RabbitMQConfig.SCORPIOS_QUEUE_NAME, "hello scorpios....");
        log.info("-----------------------------------");
        return "success";
    }

}

测试结果:浏览器输入:http://localhost:8888/send/message,看控制台日志输入

在这里插入图片描述

在上面的代码中,并没有创建Exchange,而是使用默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange上,当一条消息到达 DirectExchange时会被转发到与该条消息 routing key 相同的 Queue上,例如消息队列名为 scorpios_queue_name,则 routingkeyscorpios_queue_name”的消息会被该消息队列接收。

正如官网介绍这种方式的一样:The simplest thing that does something,这种方式最为简单

说明:如果此处的生产者在另外一个工程中,只需要把Controller和RabbitMQConfig复制过去,就可以了。

2.2.2 Work queues

这种方式的主要考虑的是:消息队列的消息如何被消费者消费

一个生产者,一个默认的交换机(DirectExchange),一个队列,两个消费者,如下图:

在这里插入图片描述

一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。

消费者也可以配置各自的并发能力,进而提高消息的消费能力,消费者也可以配置手动 ack,来决定是否要消费某一条消息

先看并发能力的配置,如下:

@Slf4j
@Component
public class ScorpiosConsumer {

    @RabbitListener(queues = RabbitMQConfig.SCORPIOS_QUEUE_NAME)
    public void consumeOne(String msg) {
        log.info("consumeOne消费者收到的消息为:{}",msg);
    }

    // 表示此消费者会创建5个线程来执行
    @RabbitListener(queues = RabbitMQConfig.SCORPIOS_QUEUE_NAME,concurrency = "5")
    public void consumeTwo(String msg) {
        log.info("consumeTwo消费者收到的消息为:{}",msg);
    }
}

第二个消费者我配置了 concurrency为 5,此时,对于第二个消费者,将会同时存在5 个子线程去消费消息

启动项目,在 RabbitMQ后台也可以看到一共有 6个消费者。一个连接,具有6Channel

在这里插入图片描述

此时,如果生产者发送 5条消息,就会一下都被消费掉

消息发送方式如下:

@Slf4j
@RestController
public class RabbitMQController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/send/message")
    public String test(){
        log.info("接收到客户端消息,向RabbitMQ发送消息...");
        for (int i = 0; i < 5; i++) {
            rabbitTemplate.convertAndSend(RabbitMQConfig.SCORPIOS_QUEUE_NAME, "hello scorpios...." + i);
        }
        log.info("-----------------------------------");
        return "success";
    }
}

测试结果:浏览器输入:http://localhost:8889/send/message,看控制台日志输入

在这里插入图片描述

可以看到,消息都被第二个消费者消费了。但需要注意,多试几次可以看到,消息也有可能被第一个消费者消费

下面来看一下,消费者开启手动 ack,这样可以自行决定是否消费 RabbitMQ发来的消息,配置手动 ack需要在配置文件中添加如下配置:

spring.rabbitmq.listener.simple.acknowledge-mode=manual

消费代码如下:

@Slf4j
@Component
public class ScorpiosConsumer {

    @RabbitListener(queues = RabbitMQConfig.SCORPIOS_QUEUE_NAME)
    public void consumeOne(Message message, Channel channel) throws IOException {
        log.info("consumeOne消费者收到的消息为:{}",message.getPayload());
        // 确认消费消息
        channel.basicAck(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)),true);
    }

    @RabbitListener(queues = RabbitMQConfig.SCORPIOS_QUEUE_NAME,concurrency = "5")
    public void consumeTwo(Message message, Channel channel) throws IOException {
        log.info("consumeTwo消费者收到的消息为:{},消费线程为:{}", message.getPayload(), Thread.currentThread().getName());
        // 拒绝消费消息
        channel.basicReject(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)), true);
    }
}

测试结果:浏览器输入:http://localhost:8889/send/message,看控制台日志输入

在这里插入图片描述

此时第二个消费者拒绝了所有消息,第一个消费者消费了所有消息

2.2.3 Publish/Subscribe

这种方式主要考虑的是:消息到达交换机后,如何转到消息队列

一个生产者(Producer),一个交换机(Exchange),多个消费者(Consumer),每一个消费者都有自己的一个队列(Queue

生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的

需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么该消息将会丢失,这是因为在 RabbitMQExchange不具备存储消息的能力,只有队列具备存储消息的能力,如下图:

在这里插入图片描述

这里交换机有四种选择,分别是:

2.2.3.1 Direct

DirectExchange的路由策略是将消息队列(Queue)绑定到一个 DirectExchange上,当一条消息到达 DirectExchange时会被转发到与该条消息 routing key 相同的 Queue上。

配置类:

@Configuration
public class RabbitMQConfig {

    // 交换机的名称
    public static final String SCORPIOS_EXCHANGE_NAME = "scorpios_exchange_name";

    // 队列的名称
    public static final String SCORPIOS_QUEUE_NAME = "scorpios_queue_name";

    /**
     * 创建一个DirectExchange交换机
     * 第一个参数:交换机名字
     * 第二个参数:重启后是否依然有效
     * 第三个参数:长期未用时是否删除
     * @return
     */
    @Bean
    DirectExchange directExchange(){
        return new DirectExchange(RabbitMQConfig.SCORPIOS_EXCHANGE_NAME,true,false);
    }

    @Bean
    Queue queue() {
        return new Queue(RabbitMQConfig.SCORPIOS_QUEUE_NAME,true,false,false);
    }

    // 将队列与DirectExchange绑定,要指定routingkey
    @Bean
    Binding binding() {
        return BindingBuilder.bind(queue()).to(directExchange()).with("direct");
    }
    
}

Binding其实是ExchangeQueue之间的桥梁,它告诉我们Exchange和哪个Queue进行了绑定关系

消费者:

@Slf4j
@Component
public class DirectConsumer {

    @RabbitListener(queues = RabbitMQConfig.SCORPIOS_QUEUE_NAME)
    public void consume(String msg) {
        log.info("consume消费者收到的消息为:{}",msg);
    }
}

发送者:

@Slf4j
@RestController
public class RabbitMQController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/send/message")
    public String test(){
        log.info("接收到客户端消息");
        // 要添加routingkey参数
        rabbitTemplate.convertAndSend(RabbitMQConfig.SCORPIOS_EXCHANGE_NAME,"direct","hello scorpios...");
        return "success";
    }
    
}

测试结果:浏览器输入:http://localhost:8889/send/message,看控制台日志输入

在这里插入图片描述

在配置类中要把DirectExchange和Queue进行binding,并且要指定routingkey

同时,在发送者的代码中,要指定交换机的名字和routingkey

结合下面这张图再次理解下,交换机的类型为DirectExchange

在这里插入图片描述

2.2.3.2 Fanout

FanoutExchange的数据交换策略是把所有到达 FanoutExchange的消息转发给所有与它绑定的 Queue上,在这种策略中,routingkey将不起任何作用,FanoutExchange配置方式如下:

@Configuration
public class FanoutRabbitMQConfig {
    
    // 交换机的名称
    public static final String SCORPIOS_EXCHANGE_NAME = "scorpios_exchange_name";

    // 队列的名称
    public static final String SCORPIOS_QUEUE_NAME_ONE = "scorpios_queue_name_one";
    public static final String SCORPIOS_QUEUE_NAME_TWO = "scorpios_queue_name_two";

    /**
     * 创建一个FanoutExchange交换机
     * 第一个参数:交换机名字
     * 第二个参数:重启后是否依然有效
     * 第三个参数:长期未用时是否删除
     * @return
     */
    @Bean
    FanoutExchange fanoutExchange(){
        return new FanoutExchange(FanoutRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,true,false);
    }

    @Bean
    Queue queueOne() {
        return new Queue(FanoutRabbitMQConfig.SCORPIOS_QUEUE_NAME_ONE,true,false,false);
    }

    @Bean
    Queue queueTwo() {
        return new Queue(FanoutRabbitMQConfig.SCORPIOS_QUEUE_NAME_TWO,true,false,false);
    }

    // 将队列与FanoutExchange绑定
    @Bean
    Binding bindingOne() {
        return BindingBuilder.bind(queueOne()).to(fanoutExchange());
    }

    @Bean
    Binding bindingTwo() {
        return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
    }

}

上面创建 FanoutExchange,参数含义与创建 DirectExchange参数含义一致,然后创建两个 Queue,再将这两个 Queue都绑定到 FanoutExchange上。接下来创建两个消费者,如下:

@Slf4j
@Component
public class FanoutConsumer {

    @RabbitListener(queues = FanoutRabbitMQConfig.SCORPIOS_QUEUE_NAME_ONE)
    public void consumeOne(String msg) {
        log.info("consumeOne消费者收到的消息为:{}",msg);
    }

    @RabbitListener(queues = FanoutRabbitMQConfig.SCORPIOS_QUEUE_NAME_TWO)
    public void consumeTwo(String msg) {
        log.info("consumeTwo消费者收到的消息为:{}",msg);
    }

}

发送者:

@Slf4j
@RestController
public class RabbitMQController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/send/message")
    public String test(){
        log.info("接收到客户端消息");
        // routingkey 参数为null
        rabbitTemplate.convertAndSend(FanoutRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,null,"hello scorpios... FanoutExchange");
        return "success";
    }}

}

注意这里发送消息时不需要 routingkey,指定 exchange 即可,routingkey 可以直接传一个 null

测试结果:浏览器输入:http://localhost:8889/send/message,看控制台日志输入

在这里插入图片描述

FanoutExchange 交换机是将接收到的所有消息广播到它知道的所有队列中

来看下面这张图:

在这里插入图片描述

如果Exchange的绑定类型是Direct,但是它绑定的多个队列的key如果都相同,在这种情况下虽然绑定类型是Direct,但是它表现的就和Fanout有点类似了,就跟广播差不多,如上图所示。

2.2.3.3 Topic

TopicExchange是比较复杂但是也比较灵活的一种路由策略,在 TopicExchange中,Queue通过 routingkey绑定到 TopicExchange上,当消息到达 TopicExchange后,TopicExchange 根据消息的routingkey 将消息路由到一个或者多个Queue `上。

在这里插入图片描述

TopicExchange配置如下:

@Configuration
public class TopicsRabbitMQConfig {

    // 交换机的名称
    public static final String SCORPIOS_EXCHANGE_NAME = "scorpios_exchange_name";

    // 队列的名称
    public static final String SCORPIOS_QUEUE_NAME_ONE = "scorpios_queue_name_xiaomi";
    public static final String SCORPIOS_QUEUE_NAME_TWO = "scorpios_queue_name_huawei";
    public static final String SCORPIOS_QUEUE_NAME_THREE = "scorpios_queue_name_phone";

    /**
     * 创建一个TopicExchange交换机
     * 第一个参数:交换机名字
     * 第二个参数:重启后是否依然有效
     * 第三个参数:长期未用时是否删除
     * @return
     */
    @Bean
    TopicExchange topicExchange(){
        return new TopicExchange(TopicsRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,true,false);
    }

    @Bean
    Queue xiaomi() {
        return new Queue(TopicsRabbitMQConfig.SCORPIOS_QUEUE_NAME_ONE,true,false,false);
    }

    @Bean
    Queue huawei() {
        return new Queue(TopicsRabbitMQConfig.SCORPIOS_QUEUE_NAME_TWO,true,false,false);
    }

    @Bean
    Queue phone() {
        return new Queue(TopicsRabbitMQConfig.SCORPIOS_QUEUE_NAME_THREE,true,false,false);
    }

    // 将队列与TopicExchange绑定
    @Bean
    Binding bindingXiaomi() {
        return BindingBuilder.bind(xiaomi()).to(topicExchange()).with("xiaomi.#");
    }

    @Bean
    Binding bindingHuawei() {
        return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#");
    }

    @Bean
    Binding bindingPhone() {
        return BindingBuilder.bind(phone()).to(topicExchange()).with("#.phone.#");
    }

}

接下来针对三个 Queue创建三个消费者,如下:

@Slf4j
@Component
public class TopicsConsumer {

    @RabbitListener(queues = TopicsRabbitMQConfig.SCORPIOS_QUEUE_NAME_ONE)
    public void consumeXiaomi(String msg) {
        log.info("consumeXiaomi消费者收到的消息为:{},匹配routingkey:xiaomi.#",msg);
    }

    @RabbitListener(queues = TopicsRabbitMQConfig.SCORPIOS_QUEUE_NAME_TWO)
    public void consumeHuawei(String msg) {
        log.info("consumeHuawei消费者收到的消息为:{},匹配routingkey:huawei.#",msg);
    }

    @RabbitListener(queues = TopicsRabbitMQConfig.SCORPIOS_QUEUE_NAME_THREE)
    public void consumePhone(String msg) {
        log.info("consumePhone消费者收到的消息为:{},匹配routingkey:#.phone.#",msg);
    }

}

发送者:

@Slf4j
@RestController
public class RabbitMQController {

    @Autowired
    RabbitTemplate rabbitTemplate;

     @GetMapping("/send/message")
    public String test(){
        log.info("接收到客户端消息");
        rabbitTemplate.convertAndSend(TopicsRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,"xiaomi.news","小米新闻,xiao.news");
        rabbitTemplate.convertAndSend(TopicsRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,"huawei.news","华为新闻,huawei.news");
        rabbitTemplate.convertAndSend(TopicsRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,"xiaomi.phone","小米手机,xiaomi.phone");
        rabbitTemplate.convertAndSend(TopicsRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,"huawei.phone","华为手机,huawei.phone");
        rabbitTemplate.convertAndSend(TopicsRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,"phone.news","手机新闻,phone.news");
        return "success";
    }

}

根据 TopicsRabbitMQConfig中的配置,测试结果应该如下:

测试结果:浏览器输入:http://localhost:8889/send/message,看控制台日志输入

在这里插入图片描述

2.2.3.4 Header

HeadersExchange是一种使用较少的路由策略,HeadersExchange会根据消息的 Header将消息路由到不同的 Queue上,这种策略也和 routingkey无关,HeadersExchange配置如下:

@Configuration
public class HeaderRabbitMQConfig {

    // 交换机的名称
    public static final String SCORPIOS_EXCHANGE_NAME = "scorpios_exchange_name";

    // 队列的名称
    public static final String SCORPIOS_QUEUE_NAME_ONE = "scorpios_queue_name_name";
    public static final String SCORPIOS_QUEUE_NAME_TWO = "scorpios_queue_name_age";

    /**
     * 创建一个HeadersExchange交换机
     * 第一个参数:交换机名字
     * 第二个参数:重启后是否依然有效
     * 第三个参数:长期未用时是否删除
     * @return
     */
    @Bean
    HeadersExchange headersExchange(){
        return new HeadersExchange(HeaderRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,true,false);
    }

    @Bean
    Queue queueName() {
        return new Queue(HeaderRabbitMQConfig.SCORPIOS_QUEUE_NAME_ONE,true,false,false);
    }

    @Bean
    Queue queueAge() {
        return new Queue(HeaderRabbitMQConfig.SCORPIOS_QUEUE_NAME_TWO,true,false,false);
    }

    // 将队列与HeadersExchange绑定
    @Bean
    Binding bindingName() {
        Map<String, Object> map = new HashMap<>();
        map.put("name", "scorpios");
        return BindingBuilder.bind(queueName()).to(headersExchange()).whereAny(map).match();
    }

    @Bean
    Binding bindingAge() {
        return BindingBuilder.bind(queueAge()).to(headersExchange()).whereAny("age").exist();
    }

}

这里主要关注下 Binding的配置上,第一个 bindingName方法中,whereAny表示消息的 Header中只要有一个 Header匹配上 map中的 key/value,就把该消息路由到名为 scorpios_queue_name_nameQueue上,这里也可以使用 whereAll方法,表示消息的所有 Header都要匹配。whereAnywhereAll实际上对应了一个名为 x-match 的属性。bindingAge中的配置则表示只要消息的 Header中包含 age,不管 age的值是多少,都将消息路由到名为 scorpios_queue_name_ageQueue

消费者:

@Slf4j
@Component
public class HeaderConsumer {

    @RabbitListener(queues = HeaderRabbitMQConfig.SCORPIOS_QUEUE_NAME_ONE)
    public void consumeName(String msg) {
        log.info("consumeName消费者收到的消息为:{}",msg);
    }

    @RabbitListener(queues = HeaderRabbitMQConfig.SCORPIOS_QUEUE_NAME_TWO)
    public void consumeAge(String msg) {
        log.info("consumeAge消费者收到的消息为:{}",msg);
    }

}

发送者:

@Slf4j
@RestController
public class RabbitMQController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/send/message")
    public String test(){
        log.info("接收到客户端消息");
        Message name = MessageBuilder.withBody("header exchange, scorpios_queue_name_name".getBytes())
                                        .setHeader("name", "scorpios").build();
        Message age = MessageBuilder.withBody("header exchange, scorpios_queue_name_age".getBytes())
                                        .setHeader("age", "20").build();
        rabbitTemplate.convertAndSend(HeaderRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,null,name);
        rabbitTemplate.convertAndSend(HeaderRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,null,age);
        return "success";
    }

}

创建并发送两条消息,两条消息具有不同的 header,不同 header的消息将被发到不同的 Queue中去

测试结果:浏览器输入:http://localhost:8889/send/message,看控制台日志输入

在这里插入图片描述

2.2.3.5 小结

2.2.4 Routing

一个生产者,一个交换机,两个队列,两个消费者,生产者在创建 Exchange后,根据 RoutingKey去绑定相应的队列,并且在发送消息时,指定消息的具体 RoutingKey即可,看下图理解下:

在这里插入图片描述

这种方式就是按照 routing key 去路由消息,可以参考上面DirectExchangeTopicExchange使用

2.2.5 Topics

一个生产者,一个交换机,两个队列,两个消费者,生产者创建 TopicExchange并且绑定到队列中,这次绑定可以通过 *# 关键字,对指定 RoutingKey 内容,编写使用通配符,看下图理解下:

在这里插入图片描述

这种方式就是按照 routing key 去路由消息,可以参考上面TopicExchange使用

2.2.6 RPC

RabbitMQ提供了RPC功能,原理图如下:

在这里插入图片描述

原理解释:

具体示例,下一篇实现~

2.2.7 Publisher Confirms

在解决消息可靠性的问题时,有两种方式:事务和消息确认。

对于消息是否被成功消费,可以使用这种方式——消息确认机制。消息确认分为:自动确认和手动确认。

在上面的代码中,大部分都使用了自动确认。除了在介绍Work Queues方式时,消费者开启了手动 ack

这种方式很重要,后续单独研究吧~~

标签:NAME,队列,RabbitMQ,Queue,收发,消息,SCORPIOS,消息中间件,public
来源: https://blog.csdn.net/zxd1435513775/article/details/122789734