其他分享
首页 > 其他分享> > 2021-10-22

2021-10-22

作者:互联网

文章目录

一、 AMQP简介

1 AMQP是什么?

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是进程之间传递异步消息的网络协议。

2 AMQP工作过程

发布者(Publisher)发布消息(Message),经过交换机(Exchange),交换机根据路由规则将收到消息分发给交换机绑定的队列(Queue),最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
在这里插入图片描述

3 队列

队列是数据结构中概念。数据存储在一个队列中,数据是有顺序的,先进的先出,后进后出。其中一侧负责进数据,另一次负责出数据。
MQ(消息队列)很多功能都是基于此队列结构实现的

二、 RabbitMQ简介

1 RabbitMQ介绍

RabbitMQ是由Erlang语言编写的基于AMQP的消息中间件。而消息中间件作为分布式系统重要组件之一,可以解决应用耦合,异步消息,流量削峰等问题。

1.1 解决应用耦合
1.1.1 不使用MQ时
在这里插入图片描述

1.1.2 使用MQ解决耦合
在这里插入图片描述

2 RabbitMQ适用场景

排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、流量销峰等。

三、 RabbitMQ原理

在这里插入图片描述

1.Message

消息。消息是不具名的,它由消息头消息体组成。消息体是不透明的,而消息头则由
一系列可选属性组成,这些属性包括:routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出消息可能持久性存储)等。

2.Publisher

消息的生产者。也是一个向交换器发布消息的客户端应用程序。

3.Consumer

消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。

4.Exchange

交换器。用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
三种常用的交换器类型

  1. direct(发布与订阅 完全匹配)
  2. fanout(广播)
  3. topic(主题,规则匹配)

5.Binding

绑定。用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

6.Queue

消息队列。用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者链接到这个队列将其取走。

7.Routing-key

路由键。RabbitMQ决定消息该投递到哪个队列的规则。(也可以理解为队列的名称,路由键是key,队列是value)
队列通过路由键绑定到交换器。
消息发送到MQ服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ也会将其和绑定使用的路由键进行匹配。
如果相匹配,消息将会投递到该队列。
如果不匹配,消息将会进入黑洞。

8.Connection

链接。指rabbit服务器和服务建立的TCP链接。

9.Channel

信道。
1,Channel中文叫做信道,是TCP里面的虚拟链接。例如:电缆相当于TCP,信道是一个独立光纤束,一条TCP连接上创建多条信道是没有问题的。
2,TCP一旦打开,就会创建AMQP信道。
3,无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。

10.Virtual Host

虚拟主机。表示一批交换器,消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是/

11.Borker

表示消息队列服务器实体。

交换器和队列的关系

交换器是通过路由键和队列绑定在一起的,如果消息拥有的路由键跟队列和交换器的路由键匹配,那么消息就会被路由到该绑定的队列中。
 也就是说,消息到队列的过程中,消息首先会经过交换器,接下来交换器在通过路由键匹配分发消息到具体的队列中。
 路由键可以理解为匹配的规则。

RabbitMQ为什么需要信道?为什么不是TCP直接通信?

  1. TCP的创建和销毁开销特别大。创建需要3次握手,销毁需要4次分手。
  2. 如果不用信道,那应用程序就会以TCP链接Rabbit,高峰时每秒成千上万条链接会造成资源巨大的浪费,而且操作系统每秒处理TCP链接数也是有限制的,必定造成性能瓶颈。
  3. 信道的原理是一条线程一条通道,多条线程多条通道同用一条TCP链接。一条TCP链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。

四、 RabbitMq账户管理

1 创建账户

# ./rabbitmqctl add_user bjsxt bjsxt

2 给用户授予管理员角色

# ./rabbitmqctl set_user_tags bjsxt administrator

3 给用户授权

# ./rabbitmqctl set_permissions -p "/" bjsxt ".*" ".*" ".*"

4 登录

使用新建账户和密码在windows中访问rabbitmq并登录

在浏览器地址栏输入:
http://192.16814.129:15672/

用户名:bjsxt
密码:bjsxt

五、 Exchange 交换器(交换机)

交换器负责接收客户端传递过来的消息,并转发到对应的队列中。在RabbitMQ中支持四种交换器

  1. Direct Exchange:直连交换器(默认)
  2. Fanout Exchange:扇形交换器
  3. Topic Exchange:主题交换器
  4. Header Exchange:首部交换器。

在RabbitMq的Web管理界面中Exchanges选项卡就可以看见这四个交换器。

1 direct交换器

direct交换器是RabbitMQ默认交换器。默认会进行公平调度。所有接受者依次从消息队列中获取值。Publisher给哪个队列发消息,就一定是给哪个队列发送消息。对交换器绑定的其他队列没有任何影响。
(代码演示)一个队列需要绑定多个消费者
需要使用注解/API:
org.springframework.amqp.core.Queue:队列
AmqpTemplate:操作RabbitMQ的接口。负责发送或接收消息
@RabbitListener(queues = “”) 注解某个方法为接收消息方法

2 fanout交换器

在这里插入图片描述

扇形交换器,实际上做的事情就是广播,fanout会把消息发送给所有的绑定在当前交换器上的队列。对应Consumer依然采用公平调度方式。
(代码演示)一个交换器需要绑定多个队列
需要使用注解/API:
FanoutExchange:fanout交换器
Binding:绑定交换器和队列
BindingBuilder:Binding的构建器
amq.fanout:内置fanout交换器名称

3 topic交换器

在这里插入图片描述

允许在路由键(RoutingKey)中出现匹配规则。
路由键的写法和包写法相同。com.bjsxt.xxxx.xxx格式。
在绑定时可以带有下面特殊符号,中间可以出现:
* : 代表一个单词(两个.之间内容)
# : 0个或多个字符
接收方依然是公平调度,同一个队列中内容轮换获取值。
需要使用注解/API:
TopicExchange:Topic交换器
amq.topic:内置topic交换器名称

六、 例—direct交换器 -------点对点

1.父项目amqp_rabbit

1.1 pom.xml

<modelVersion>4.0.0</modelVersion>
<groupId>com.bjsxt</groupId>
<artifactId>amqp_rabbit</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>

<modules>
	<module>amqp_rabbit_consumer</module>
	<module>amqp_rabbit_publisher</module>
	<module>amqp_rabbit_pojo</module>
</modules>

<dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-dependencies</artifactId>
			<version>2.2.5.RELEASE</version>
			<scope>import</scope>
			<type>pom</type>
		</dependency>
	</dependencies>
</dependencyManagement>

<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
	</dependency>
	<!-- Spring Boot提供的关于AMQP协议实现的启动器。可以使用AMQP协议快速的访问MQ消息中间件。 -->
	-<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-amqp</artifactId>
	</dependency>
</dependencies>

2.amqp_rabbit_consumer

消息消费者工程:
启动,注册监听,连接RabbitMQ,监听某个队列Queue,实现消息消费。
Queue - RabbitMQ自动创建。
规则: Queue存在在使用现有的Queue,不存在则创建一个。 交换器创建规则相同。

2.1 pom.xml

-<parent>
		<artifactId>amqp_rabbit</artifactId>		
		<groupId>com.bjsxt</groupId>		
		<version>1.0-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>	
<artifactId>amqp_rabbit_consumer</artifactId>		
<version>1.0-SNAPSHOT</version>

<dependencies>
 	<dependency>
		<groupId>com.bjsxt</groupId>	
		<artifactId>amqp_rabbit_pojo</artifactId>		
		<version>1.0-SNAPSHOT</version>	
	</dependency>
</dependencies>

2.2 InfoLogconsumer.java

//消费info的consumer,同时加@Component注解配置为bean对象让spring管理

package com.bjsxt.rabbit.consumer;

import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * 日志消息消费者,只消费Info日志。
 * 日志消息存储在队列 log-info-queue
 * 使用的交换器名称是 log-ex-direct
 * 交换器类型是 direct
 * 队列的路由键是 direct-rk-info
 *
 * 注解RabbitListener - 监听注解。可以描述类型和方法。
 *  类型 - 当前类型监听某个队列。
 *  方法 - 当前方法监听某个队列。
 *  属性 -
 *    bindings - QueueBinding[]类型,代表这个类型或方法监听的队列、交换器、路由键的绑定方式
 * 注解QueueBinding -
 *  属性 -
 *    value - 绑定监听的队列是什么
 *    exchange - 队列对应的交换器是什么
 *    key - 队列的路由键是什么
 * 注解Queue - 描述一个队列
 *  属性 -
 *    value|name - 队列名称
 *    autoDelete - 是否自动删除。默认为"", 如果队列名称定义,不自动删除;队列名称不定义,队列为自动删除队列。
 *      如果是自动删除,代表所有的consumer关闭后,队列自动删除。
 * 注解Exchange - 描述一个交换器
 *  属性 -
 *    value|name - 交换器名称
 *    type - 交换器的类型,可选direct|fanout|topic, 默认direct
 *    autoDelete - 是否自动删除,默认为false。不自动删除。
 */
@RabbitListener(bindings = {
    @QueueBinding(
            value = @Queue(value = "log-info-queue", autoDelete = "false"),
            exchange = @Exchange(value = "log-ex-direct", type = "direct", autoDelete = "false"),
            key = "direct-rk-info"
    )
})
@Component
public class InfoLogConsumer {
    /**
     * 消息消费的方法。当队列log-info-queue中出现消息,立刻消费。
     *
     * RabbitHandler注解 - 配合类型上的RabbitListener注解,标记当前的方法,是一个监听消息队列,消费消息的方法。
     *
     * @param msg 消息内容。
     */
    @RabbitHandler
    public void onMessage(String msg){
        System.out.println("InfoLogConsumer 消费消息:" + msg);
    }
}

2.3 LogConsumers.java

//消费error和warn的consumer,同时加@Component注解配置为bean对象让spring管理

package com.bjsxt.rabbit.consumer;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 日志消息消费者,消费error和warn日志。
 * error日志消息存储在队列 log-error-queue
 * warn日志消息存储在队列 log-warn-queue
 * 使用的交换器名称是 log-ex-direct
 * 交换器类型是 direct
 * error队列的路由键是 direct-rk-error
 * warn队列的路由键是 direct-rk-warn
 */
@Component
public class LogConsumers {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "log-error-queue"),
                    exchange = @Exchange(value = "log-ex-direct"),
                    key = "direct-rk-error"
            )
    })
    public void onLogErrorMessage(String msg){
        System.out.println("错误日志信息:" + msg);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "log-warn-queue", autoDelete = "false"),
                    exchange = @Exchange(value = "log-ex-direct"),
                    key = "direct-rk-warn"
            )
    })
    public void onLogWarnMessage(String msg){
        System.out.println("警告日志信息:" + msg);
    }
}

2.4 配置application.yml

spring:
  rabbitmq:
    host: 192.168.89.141  # RabbitMQ服务的地址,默认localhost
    port: 5672  # RabbitMQ的端口,默认5672。
    username: bjsxt # 访问RabbitMQ的用户名,默认guest
    password: bjsxt # 访问RabbitMQ的密码,默认guest
    virtual-host: /  # 访问RabbitMQ的哪一个虚拟主机,默认为 /

2.5 启动类RabbitConsumerApp

package com.bjsxt;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitConsumerApp {
    public static void main(String[] args) {
        SpringApplication.run(RabbitConsumerApp.class, args);
    }
}

2.6 搭建consumer的集群

//RabbitConsumerApp1

package com.bjsxt;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitConsumerApp1 {
    public static void main(String[] args) {
        SpringApplication.run(RabbitConsumerApp1.class, args);
    }
}

3.amqp_rabbit_publisherf

3.1 pom.xml

-<parent>
	<artifactId>amqp_rabbit</artifactId>
	<groupId>com.bjsxt</groupId>
	<version>1.0-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
<artifactId>amqp_rabbit_publisher</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
	<dependency>
		<groupId>com.bjsxt</groupId>
		<artifactId>amqp_rabbit_pojo</artifactId>
		<version>1.0-SNAPSHOT</version>
	</dependency>
</dependencies>

3.2 LogMessageSender

package com.bjsxt.rabbit.sender;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 发送消息的类型。
 * 把消息发送到RabbitMQ中。
 * 在spring-boot-starter-amqp中,启动器自动创建初始化一个AmqpTemplate,
 * 作为访问Amqp消息服务器(MQ中间件)的客户端对象。
 */
@Component
public class LogMessageSender {
    @Autowired
    private AmqpTemplate template;

    /**
     * 发送消息的方法。
     * template.convertAndSend(String exchange, String routingKey, Object message)
     * exchange - 交换器名称
     * routingKey - 路由键
     * message - 要发送的消息内容,就是传递的消息对象的消息体。
     */
    public void sendMessage(String exchange, String routingKey, String message){
        this.template.convertAndSend(exchange, routingKey, message);
    }
}

3.3 启动类

package com.bjsxt;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitPublisherApp {
    public static void main(String[] args) {
        SpringApplication.run(RabbitPublisherApp.class, args);
    }
}

3.4 配置application.yml

spring:
  rabbitmq:
    host: 192.168.89.141
    username: bjsxt
    password: bjsxt

3.5 测试direct

//测试direct轮询

package com.bjsxt.test;

/**
 * 消息发送者测试类型
 */
@SpringBootTest(classes = {RabbitPublisherApp.class})
@RunWith(SpringRunner.class)
public class TestPublisher {
   
    @Autowired
    private LogMessageSender sender;

    private String exchange = "log-ex-direct";
    private String rkInfo = "direct-rk-info";
    private String rkError = "direct-rk-error";
    private String rkWarn = "direct-rk-warn";

    @Test
    public void testSend2Consumers(){
        for(int i = 0; i < 10; i++){
            this.sender.sendMessage(exchange, rkInfo, "info消息"+i);
            this.sender.sendMessage(exchange, rkError, "error消息"+i);
            this.sender.sendMessage(exchange, rkWarn, "warn消息"+i);
        }
    }

七.新建模块amqp_rabbit_pojo

package com.bjsxt.entity;

import java.io.Serializable;
import java.util.Objects;

// 实体类型
public class User implements Serializable {
    // 定义一个序列化唯一ID。
    public static final long serialVersionUID = 1L;
    private Long id;
    private String name;
    private int age;

    public User(){}
    //set get toString equals hashCode 

八.例—fanout交换器 -----广播

1.消费者FanoutConsumers

package com.bjsxt.rabbit.fanout;

import com.bjsxt.entity.User;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 广播交换器,消费者。
 */
@Component
public class FanoutConsumers {
    /**
     * 消费消息的方法。
     * @param user 消息体内容。
     */
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "queue-user-1", autoDelete = "false"),
                    exchange = @Exchange(value = "ex-fanout", type = "fanout", autoDelete = "false")
            )
    })
    public void onMessage1(User user){
        System.out.println("onMessage1 run : " + user);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "queue-user-2", autoDelete = "false"),
                    exchange = @Exchange(value = "ex-fanout", type = "fanout")
            )
    })
    public void onMessage2(User user){
        System.out.println("onMessage2 run : " + user);
    }
}

2.消息发送者amqp_rabbit_publisher------UserMessageSender

package com.bjsxt.rabbit.fanoutsender;

import com.bjsxt.entity.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 发送消息类型。消息发送到fanout交换器中。
 * 交换器名称是: ex-fanout
 */
@Component
public class UserMessageSender {
    @Autowired
    private AmqpTemplate template;

    /**
     * 发送消息方法。
     * @param user
     */
    public void send(User user){
        this.template.convertAndSend("ex-fanout", "", user);
    }
}

3.发送消息TestPublisher

package com.bjsxt.test;

import com.bjsxt.RabbitPublisherApp;
import com.bjsxt.entity.User;
import com.bjsxt.rabbit.fanoutsender.UserMessageSender;
import com.bjsxt.rabbit.sender.LogMessageSender;
import com.bjsxt.rabbit.topicsender.TopicMessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Random;

/**
 * 消息发送者测试类型
 */
@SpringBootTest(classes = {RabbitPublisherApp.class})
@RunWith(SpringRunner.class)
public class TestPublisher {

    @Autowired
    private LogMessageSender sender;
    @Autowired
    private UserMessageSender userMessageSender;
 
    private String exchange = "log-ex-direct";
    private String rkInfo = "direct-rk-info";
    private String rkError = "direct-rk-error";
    private String rkWarn = "direct-rk-warn";

  
    @Test
    public void testSendUserMessage2Fanout(){
        for(int i = 0; i < 3; i++){
            User user = new User();
            user.setId((long) i);
            user.setName("姓名 - " + i);
            user.setAge(20+i);

            this.userMessageSender.send(user);
        }
    }



九.例—topic交换器(重点)

1.消费者TopicConsumers

package com.bjsxt.rabbit.topic;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 主题消息消费者。
 */
@Component
public class TopicConsumers {
    /**
     * 短信消息消费者,对应的routingKey是 user.rk.sms | order.rk.sms | pay.rk.sms | reg.rk.sms 等。
     * 分别代表,用户登录短信|订单下订成功通知短信|支付成功通知短信|注册码通知短信 等。
     */
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "queue-sms-topic", autoDelete = "false"),
                    exchange = @Exchange(value = "ex-topic", type = "topic"),
                    key = "*.rk.sms"
            )
    })
    public void onUserSMSMessage(String message){
        System.out.println("用户短信消息内容是:" + message);
    }

    /**
     * 路由键包括: user.rk.email | reg.rk.email | pay.rk.email
     * @param message
     */
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "queue-email-topic", autoDelete = "false"),
                    exchange = @Exchange(value = "ex-topic", type = "topic"),
                    key = "*.rk.email"
            )
    })
    public void onUserEmailMessage(String message){
        System.out.println("用户邮件消息内容是:" + message);
    }

    /**
     * 所有的和 rk相关的消息,统一处理消费。
     * 包含的路由键有: user.rk.sms | user.rk.email | reg.rk.sms | reg.rk.email 等。
     * 不发短信,不发邮件,作为一个日志记录工具存在。
     * @param message
     */
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "queue-all-topic", autoDelete = "false"),
                    exchange = @Exchange(value = "ex-topic", type = "topic"),
                    key = "*.rk.*"
            )
    })
    public void onUserServiceMessage(String message){
        System.out.println("执行的消息处理逻辑是:" + message);
    }
}

2.消息发送者amqp_rabbit_publisher------TopicMessageSender

package com.bjsxt.rabbit.topicsender;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 发送消息到主题交换器
 */
@Component
public class TopicMessageSender {
    @Autowired
    private AmqpTemplate template;

    /**
     * 发送消息的方法
     * @param exchange
     * @param routingKey
     * @param message
     */
    public void send(String exchange, String routingKey, String message){
        template.convertAndSend(exchange, routingKey, message);
    }
}

3.发送消息TestPublisher

package com.bjsxt.test;

import java.util.Random;

/**
 * 消息发送者测试类型
 */
@SpringBootTest(classes = {RabbitPublisherApp.class})
@RunWith(SpringRunner.class)
public class TestPublisher {

    @Autowired
    private TopicMessageSender topicMessageSender;

    private String exchange = "log-ex-direct";
    private String rkInfo = "direct-rk-info";
    private String rkError = "direct-rk-error";
    private String rkWarn = "direct-rk-warn";

    @Test
    public void testSendMessage2Topic(){
        // 随机数%6
        // 0 rk - user.rk.sms *.rk.*  *.rk.sms
        // 1 rk - user.rk.email   *.rk.* *.rk.email
        // 2 rk - order.rk.sms *.rk.*  *.rk.sms
        // 3 rk - order.rk.email  *.rk.* *.rk.email
        // 4 rk - reg.rk.sms *.rk.*  *.rk.sms
        // 5 rk - reg.rk.qq  *.rk.*
        Random r = new Random();
        for(int i = 0; i < 10; i++){
            int rInt = r.nextInt(100);
            if(rInt%6 == 0){
                this.topicMessageSender.send("ex-topic",
                        "user.rk.sms",
                        "用户登录验证码是123456 - 发送短信");
            }else if(rInt%6 == 1){
                this.topicMessageSender.send("ex-topic",
                        "user.rk.email",
                        "用户登录验证码是123456 - 发送到邮箱");
            }else if(rInt%6 == 2){
                this.topicMessageSender.send("ex-topic",
                        "order.rk.sms",
                        "订单下订成功 - 发送短信");
            }else if(rInt%6 == 3){
                this.topicMessageSender.send("ex-topic",
                        "order.rk.email",
                        "订单下订成功 - 发送到邮箱");
            }else if(rInt%6 == 4){
                this.topicMessageSender.send("ex-topic",
                        "reg.rk.sms",
                        "注册验证码是654321 - 发送短信");
            }else if(rInt%6 == 5){
                this.topicMessageSender.send("ex-topic",
                        "reg.rk.qq",
                        "注册验证码是654321 - 发送QQ信息");
            }
        }
    }

标签:10,22,交换器,队列,2021,rabbit,org,import,rk
来源: https://blog.csdn.net/qq_53609683/article/details/120901003