其他分享
首页 > 其他分享> > RabbitMq

RabbitMq

作者:互联网

介绍

AMQP和JMS消息服务

特性

常见概念

mq需要的类

RabbitMQ消息队列和核⼼概念

RabbitMQ:http://www.rabbitmq.com/

核心概念, 了解了这些概念,是使用好RabbitMQ的基础

image-20220909071135102

生产者将消息发送给交换器时,需要一个RoutingKey,当BindingKey和 RoutingKey相匹配时,消息会被路由到对应的队列中

Docker安装RabbitMQ

#拉取镜像
docker pull rabbitmq:management

docker run -d --hostname rabbit_host1 --name xd_rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management

#介绍
-d 以守护进程方式在后台运行
-p 15672:15672 management 界面管理访问端口
-p 5672:5672 amqp 访问端口
--name:指定容器名
--hostname:设定容器的主机名,它会被写到容器内的 /etc/hostname 和 /etc/hosts,作为容器主机IP的别名,并且将显示在容器的bash中

-e 参数
  RABBITMQ_DEFAULT_USER 用户名
  RABBITMQ_DEFAULT_PASS 密码

主要端口介绍

4369 erlang 发现口

5672 client 端通信口

15672 管理界面 ui 端口

25672 server 间内部通信口

访问管理界面

ip:15672

注意事项!!!!

- Linux服务器检查防火墙是否关闭
- 云服务器检查网络安全组是否开放端口

CentOS 7 以上默认使用的是firewall作为防火墙
查看防火墙状态
firewall-cmd --state
停止firewall
systemctl stop firewalld.service
禁止firewall开机启动
systemctl disable firewalld.service

管控台介绍

image-20220909073520620

image-20220909073723103

队列与交换机概念

队列

简单队列

Producer--->Queue --->Consumer
生产者发送消息到队列,监听该队列的消费者取出消息

image-20220909080907274

队列轮询

image-20220911145822286

				           |--->Consumer(msg*10)
Producer(msg*30)--->Queue --(---->Consumer(msg*10)
				           |--->Consumer(msg*10)
生产者发送大量消息到队列,多个消费者监听该队列,均量取出消息			   
				   

队列公平

轮询策略的优化,当多个消费者的处理数据能力不均时,轮询策略会导致资源调用不合理,比如A消费者5msg/秒,B消费者10msg/s。

此时如果投放30条msg,B:15msg1.5秒处理完,A:15msg3秒处理完。

应优化为,B:20msg2秒处理完,A:10msg2秒处理完。

交换机

image-20220911150546783

三种模式介绍

发布订阅(FANOUT)

image-20220911150913025

image-20220911150922160

路由模式(Direct)

image-20220911151301231

主题模式(Topic)

quick.orange.rabbit 只会匹配  *.orange.* 和 *.*.rabbit ,进到Q1和Q2
lazy.orange.elephant 只会匹配 *.orange.* 和 lazy.#,进到Q1和Q2
quick.orange.fox 只会匹配 *.orange.*,进入Q1
lazy.brown.fox 只会匹配azy.#,进入Q2
lazy.pink.rabbit 只会匹配 lazy.#和*.*.rabbit ,同个队列进入Q2(消息只会发一次)

quick.brown.fox 没有匹配,默认会被丢弃,可以通过回调监听二次处理

lazy.orange.male.rabbit,只会匹配 lazy.#,进入Q2

三种模式总结

FANOUT

image-20220911150913025

Producer--->Queue --(---->Consumer(msg*10)
                    (---->Consumer(msg*10)
生产者发送消息到交换机中,绑定了该叫交换机的所有队列能获取相同的信息

Direct

image-20220911151301231

         |->Queue(key(D))---->Consumer(A)
         |  
Producer--->Queue(key{A,B,C}) ------>Consumer(B)
↓
msg(key{A})
msg(key{B})
msg(key{C})
msg(key{D})



生产者发送4条信息附带4个key到交换机中
每一个队列都设置routingKey,每个队列只能获取自己对应routingKey的那条信息
如上逻辑:
Consumer(A)输出:msg(key{D})
Consumer(B)输出:msg(key{A})  msg(key{B})   msg(key{C})

Topic

image-20220911151331597

逻辑对比Direct新增了通配符,* 代表一个词,#代表1个或多个词,案例看下方

quick.orange.rabbit 只会匹配  *.orange.* 和 *.*.rabbit ,进到Q1和Q2
lazy.orange.elephant 只会匹配 *.orange.* 和 lazy.#,进到Q1和Q2
quick.orange.fox 只会匹配 *.orange.*,进入Q1
lazy.brown.fox 只会匹配azy.#,进入Q2
lazy.pink.rabbit 只会匹配 lazy.#和*.*.rabbit ,同个队列进入Q2(消息只会发一次)

quick.brown.fox 没有匹配,默认会被丢弃,可以通过回调监听二次处理

lazy.orange.male.rabbit,只会匹配 lazy.#,进入Q2
         |->Queue(key(*.orange.*))---->Consumer(A)
         |  
Producer--->Queue(key(*.*.rabbit,lazy.#))------>Consumer(B)
↓
msgA(key{quick.orange.rabbit})
msgB(key{lazy.brown.fox})



生产者发送4条信息附带4个key到交换机中
每一个队列都设置routingKey,每个队列只能获取自己对应routingKey的那条信息
如上逻辑:
Consumer(A)输出:msgA
Consumer(B)输出:msgA,msgB

整合AMQP

 <!--引入AMQP-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 代码库 -->
    <repositories>
        <repository>
            <id>maven-ali</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public//</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>always</updatePolicy>
                <checksumPolicy>fail</checksumPolicy>
            </snapshots>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>public</id>
            <name>aliyun nexus</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>

yml配置文件修改

#消息队列
spring:
  rabbitmq:
    host: 101.33.219.225
    port: 5672
    virtual-host: /dev
    password: password
    username: admin

RabbitMQConfig文件

@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "order_exchange";

    public static final String QUEUE = "order_queue";

    /**
     * topic 交换机
     * @return
     */
    @Bean
    public Exchange orderExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    /**
     * 队列
     * @return
     */
    @Bean
    public Queue orderQueue(){
        return QueueBuilder.durable(QUEUE).build();
    }

    /**
     * 交换机和队列绑定关系
     */
    @Bean
    public Binding orderBinding(Queue queue, Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
    }
}

消息消费者

@Component
@RabbitListener(queues = "order_queue")
public class OrderMQListener {

    /**
     * RabbitHandler 会自动匹配 消息类型(消息自动确认)
     * @param msg
     * @param message
     * @throws IOException
     */
    @RabbitHandler
    public void releaseCouponRecord(String msg, Message message) throws IOException {

        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag="+msgTag);
        System.out.println("message="+message.toString());
        System.out.println("监听到消息:消息内容:"+message.getBody());
    }
}

消息生产者-测试类

@SpringBootTest
class DemoApplicationTests {
    
  @Autowired
  private RabbitTemplate template;
    
  @Test
  void send() {
    template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","新订单来啦1");
  }
}

可靠性投递

confirmCallback(生产者到交换机)

#消息队列
spring:
  rabbitmq:
    host: 101.33.219.225
    port: 5672
    virtual-host: /dev
    password: password
    username: admin
    #开启二次确认,生产者到broker的交换机
    publisher-confirm-type: correlated
    @Test
    void testConfirmCallback() {
        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

            /**
             *
             * @param correlationData 配置
             * @param ack 交换机是否收到消息,true是成功,false是失败
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("ConfirmCallback==============>");
                System.out.println("correlationData==============>correlationData=" + correlationData);
                System.out.println("ack==================>ack=" + ack);
                System.out.println("cause==============>cause=" + cause);

                if (ack) {
                    System.out.println("发送成功");
                } else {
                    System.out.println("发送失败,记录到日志或者数据库");
                }

            }
        });

        template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME + "ccl", "order.new", "新订单");
    }
发送失败,记录到日志或者数据库

returnCallback(交换机到队列)

#消息队列
spring:
  rabbitmq:
    host: 101.33.219.225
    port: 5672
    virtual-host: /dev
    password: password
    username: admin
    #开启二次确认,生产者到broker的交换机
    publisher-confirm-type: correlated

    #开启二次确认,交换机到队列的可靠性投递
    publisher-returns: true
    #true:如交换机处理消息到路由失败,则返回给生产者
    template:
      mandatory: true
  @Test
  void testReturnCallback() {
    //为true,则交换机处理消息到路由失败,则会返回给生产者
    //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
    template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
      @Override
      public void returnedMessage(ReturnedMessage returned) {
        int code = returned.getReplyCode();
        System.out.println("code="+code);
        System.out.println("returned="+returned.toString());
      }
    });
    template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"xxx.order.new","新订单来啦11");
  }
code = 312
returned.toString() = ReturnedMessage [message=(Body:'新订单ReturnCallback' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=order_exchange, routingKey=xdclass.order.new]

ACK

image-20220911161423207

#消息队列
spring:
  rabbitmq:
    host: 101.33.219.225
    port: 5672
    virtual-host: /dev
    password: password
    username: admin
    #开启二次确认,生产者到broker的交换机
    publisher-confirm-type: correlated

    #开启二次确认,交换机到队列的可靠性投递
    publisher-returns: true
    #true:如交换机处理消息到路由失败,则返回给生产者
    template:
      mandatory: true

    #消息手工确认ACK
    listener:
      simple:
        acknowledge-mode: manual

ACK + DeliveryTag

   @RabbitHandler
    public void messageHandler(String body, Message message, Channel channel) throws IOException {

        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag="+msgTag);
        System.out.println("message="+message.toString());
        System.out.println("body="+body);

        //告诉broker,消息已经被确认
//        channel.basicAck(msgTag,false);

        //告诉broker,消息拒绝确认
        channel.basicNack(msgTag,false,true);

        channel.basicReject(msgTag,true);
    }

TTL死信+延迟队列

TTL

TTL介绍

image-20220911165613343

image-20220911165636472

管控台消息TTL测试

延迟队列介绍和应用场景

高可用集群

普通集群

默认的集群模式, 比如有节点 node1和node2、node3,三个节点是普通集群,但是他们仅有相同的元数据,即交换机、队列的结构;

案例:
消息只存在其中的一个节点里面,假如消息A,存储在node1节点,
消费者连接node1个节点消费消息时,可以直接取出来;

但如果 消费者是连接的是其他节点
那rabbitmq会把 queue 中的消息从存储它的节点中取出,并经过连接节点转发后再发送给消费者

问题:
假如node1故障,那node2无法获取node1存储未被消费的消息;
如果node1持久化后故障,那需要等node1恢复后才可以正常消费
如果ndoe1没做持久化后故障,那消息将会丢失

这个情况无法实现高可用性,且节点间会增加通讯获取消息,性能存在瓶颈

项目中springboot+amqp里面需要写多个节点的配置,比如下面

spring.rabbitmq.addresses = 192.168.1.1:5672,192.168.1.2:5672,192.168.1.3:5672

该模式更适合于消息无需持久化的场景,如日志传输的队列
erlang.cookie是erlang的分布式token文件,集群内各个节点的erlang.cookie需要相同,才可以互相通信

image-20220911184213610

image-20220911184219807

集群搭建

#节点一,主节点,创建-v映射目录
docker run -d --hostname rabbit_host1 --name rabbitmq1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168  -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/1/log:/var/log/rabbitmq rabbitmq:management

#节点二,创建-v映射目录
docker run -d --hostname rabbit_host2 --name rabbitmq2  -p 15673:15672 -p 5673:5672 --link rabbitmq1:rabbit_host1 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/2/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/2/log:/var/log/rabbitmq rabbitmq:management

#节点三,创建-v映射目录
docker run -d --hostname rabbit_host3 --name rabbitmq3 -p 15674:15672 -p 5674:5672 --link rabbitmq1:rabbit_host1 --link rabbitmq2:rabbit_host2 -e RABBITMQ_NODENAME=rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=xdclass.net168 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_xdclass' --privileged=true -v /usr/local/rabbitmq/3/lib:/var/lib/rabbitmq -v /usr/local/rabbitmq/3/log:/var/log/rabbitmq rabbitmq:management
--hostname 自定义Docker容器的 hostname

--link 容器之间连接,link不可或缺,使得三个容器能互相通信

--privileged=true 使用该参数,container内的root拥有真正的root权限,否则容器出现permission denied

-v 宿主机和容器路径映射

参数 RABBITMQ_NODENAME,缺省 Unix*: rabbit@$HOSTNAME
参数 RABBITMQ_DEFAULT_USER=admin
参数 RABBITMQ_DEFAULT_PASS=xdclass.net168

Erlang Cookie 值必须相同,也就是一个集群内 RABBITMQ_ERLANG_COOKIE 参数的值必须相同, 相当于不同节点之间通讯的密钥,erlang.cookie是erlang的分布式token文件,集群内各个节点的erlang.cookie需要相同,才可以互相通信

配置集群

节点一配置集群
docker exec -it rabbitmq1 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit

节点二加入集群,--ram是以内存方式加入,忽略该参数默认为磁盘节点。
docker exec -it rabbitmq2 bash
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@rabbit_host1
rabbitmqctl start_app
exit

节点三加入集群,--ram是以内存方式加入,忽略该参数默认为磁盘节点。
docker exec -it rabbitmq3 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbit_host1
rabbitmqctl start_app
exit

#查看集群节点状态,配置启动了3个节点,1个磁盘节点和2个内存节点

rabbitmqctl cluster_status

配置文件

#配置文件修改
#消息队列
spring:
  rabbitmq:
    addresses: 101.33.219.225:5672,101.33.219.225:5673,101.33.219.225:5674
    virtual-host: /dev
    password: xdclass.net168
    username: admin
    #开启消息二次确认,生产者到broker的交换机
    publisher-confirm-type: correlated


    #开启消息二次确认,交换机到队列的可靠性投递
    publisher-returns: true
    #为true,则交换机处理消息到路由失败,则会返回给生产者
    template:
      mandatory: true

    #消息手工确认ACK
    listener:
      simple:
        acknowledge-mode: manual

mirror镜像集群

队列做成镜像队列,让各队列存在于多个节点中
和普通集群比较大的区别就是【队列queue的消息message 】会在集群各节点之间同步,且并不是在 consumer 获取数据时临时拉取,而普通集群则是临时从存储的节点里面拉取对应的数据

结论:
实现了高可用性,部分节点挂掉后,不影响正常的消费
可以保证100%消息不丢失,推荐3个奇数节点,结合LVS+Keepalive进行IP漂移,防止单点故障

缺点:由于镜像队列模式下,消息数量过多,大量的消息同步也会加大网络带宽开销,适合高可用要求比较高的项目
过多节点的话,性能则更加受影响
erlang.cookie是erlang的分布式token文件,集群内各个节点的erlang.cookie需要相同,才可以互相通信

策略policy介绍

rabbitmq的策略policy是用来控制和修改集群的vhost队列和Exchange复制行为
就是要设置哪些Exchange或者queue的数据需要复制、同步,以及如何复制同步
ha-mode: 指明镜像队列的模式,可选下面的其中一个
  all:表示在集群中所有的节点上进行镜像同步(一般都用这个参数)
  exactly:表示在指定个数的节点上进行镜像同步,节点的个数由ha-params指定
  nodes:表示在指定的节点上进行镜像同步,节点名称通过ha-params指定
  
ha-sync-mode:镜像消息同步方式 automatic(自动),manually(手动)

image-20220911184835084

image-20220911184858971

配置文件

#消息队列
spring:
  rabbitmq:
    addresses: 101.33.219.225:5672,101.33.219.225:5673,101.33.219.225:5674
    virtual-host: /dev
    password: xdclass.net168
    username: admin
    #开启消息二次确认,生产者到broker的交换机
    publisher-confirm-type: correlated


    #开启消息二次确认,交换机到队列的可靠性投递
    publisher-returns: true
    #为true,则交换机处理消息到路由失败,则会返回给生产者
    template:
      mandatory: true

    #消息手工确认ACK
    listener:
      simple:
        acknowledge-mode: manual

标签:队列,rabbitmq,交换机,消息,rabbit,RabbitMq,节点
来源: https://www.cnblogs.com/youngleesin/p/16684642.html