其他分享
首页 > 其他分享> > RabbitMQ保姆级教程

RabbitMQ保姆级教程

作者:互联网

文章目录


前言

提示:RaabitMQ消息队列的学习。


一、MQ是什么?

1.1 AMQP

二、在Linux安装RabbitMQ

2.1 安装

	1. 我们把erlang环境与rabbitMQ 安装包解压到Linux
	2. rpm -ivh erlang安装包
	3. yum install socat -y 安装依赖 / rpm -ivh socat依赖包 --force --nodeps
	4. rpm -ivh rabbitmq安装包

2.2 RabbitMQ启动命令

	1. 开启服务 /sbin/service rabbitmq-server start  / service rabbitmq-server start 
	2. 停止服务 service rabbitmq-server stop 
	3. 重启服务 service rabbitmq-server restart 

2.3 开启RabbitMQ 后台管理界面

	1.  rabbitmq-plugins enable rabbitmq_management
	1. 创建rabbitMQ账号
			rabbitmqctl add_user 用户名 密码
	2. 设置用户角色
			rabbitmqctl set_user_tags 用户名 administrator #设置用户名为超级管理员
	3. 设置用户权限
			rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
	4. 查看rabbitmq的用户和角色
			rabbitmqctl list_users
	5. 登录rabbitMQ 界面: Linux虚拟机ip:15672 即可

2.3.1 登录rabbitMQ UI界面

记得开放15672端口
	访问 Linux虚拟机ip:15672 即可

输入账户密码后看到这个界面代表成功
在这里插入图片描述

2.3 Docker启动RabbitMQ

Docker安装

	1. docker pull rabbitmq:3-management
	2. 开启rabbitMQ
		docker run \
		 -e RABBITMQ_DEFAULT_USER=root \
		 -e RABBITMQ_DEFAULT_PASS=123456 \
		 --name mq \
		 --hostname mq1 \
		 -p 15672:15672 \
		 -p 5672:5672 \
		 -d \
 		 rabbitmq:3-management

2.4 常见消息模型

2.5 生产者(Producer) / 消费者(Consumer)

	<dependencies>
        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.3</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.4</version>
        </dependency>
    </dependencies>
1234567891011121314

在这里插入图片描述

	/**
 * 生产者:发消息
 */
public class Producer {
    //队列名称
    public static final String QUEUE_NAME="hello";
    //发消息
    public static void main(String[] args) throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP连接rabbitMQ队列
        factory.setHost("ip地址");
        //设置用户名密码
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        //创建连接
        Connection connection = factory.newConnection();
        //通过连接来获取 信道来发消息
        Channel channel = connection.createChannel();
        /**
         * 生成一个队列
         * 1.队列名称
         * 2.队列里面的信息是否持久化 默认false 信息存储在内存中
         * 3.该列队是否只供一个消费者进行消费,是否进行消息共享
         *   true:可以多个消费者消费
         *   false:只能一个消费者消费
         * 4.是否自动删除,最后一个消费者断开连接后,该队列是否自动删除
         *   true:自动删除
         *   false:不自动删除
         * 5.其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发消息
        String message="hello rabbitMQ";
        /**
         * 发送一个消息
         * 1.发送到哪个交换机
         * 2.路由的KEY值是哪个? 指的是本次队列的名称
         * 3.其他参数信息
         * 4.发送的消息体
         */

        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送完毕");
        channel.close();
        connection.close();
    }
}
	/**
 * 消费者:接收消息
 */
public class Consumer {
    //队列名称,接收此队列的消息
    public static final String QUEUE_NAME="hello";

    public static void main(String[] args) throws Exception{
        //创建连接工程
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP连接rabbitMQ队列
        factory.setHost("ip地址");
        //设置用户名密码
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        //创建连接
        Connection connection = factory.newConnection();
        //通过连接来获取 信道来收消息
        Channel channel = connection.createChannel();
        //声明 接收消息的回调
        DeliverCallback deliverCallback=(consumerTag, message)-> {
            //message:包含消息头和消息体,我们只想拿到消息体
            //若不进行转换,直接输出message我们拿到的则是地址
            String data = new String(message.getBody());
            System.out.println(new String(message.getBody()));
        };
        //声明 取消消费的回调
        CancelCallback cancelCallback=consumerTag->{
            System.out.println("消费消息被中断");
        };
        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动答应
         *   true:代表自动应答
         *   false:手动应答
         * 3.消费成者成功消费的回调
         * 4.消费者取消消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

    }
}

2.6 工作队列模式(Work Queues)

	public class ProducerWorkQueue {
    //队列名称
    public static final String QUEUE_NAME="hello";
    //发消息
    public static void main(String[] args) throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP连接rabbitMQ队列
        factory.setHost("ip地址");
        //设置用户名密码
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        //创建连接
        Connection connection = factory.newConnection();
        //通过连接来获取 信道来发消息
        Channel channel = connection.createChannel();
        /**
         * 生成一个队列
         * 1.队列名称
         * 2.队列里面的信息是否持久化 默认false 信息存储在内存中
         * 3.该列队是否只供一个消费者进行消费,是否进行消息共享
         *   true:可以多个消费者消费
         *   false:只能一个消费者消费
         * 4.是否自动删除,最后一个消费者断开连接后,该队列是否自动删除
         *   true:自动删除
         *   false:不自动删除
         * 5.其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        for (int i = 1; i <= 10; i++) {
            //发消息
            String message=i+"hello rabbitMQ";
            /**
             * 发送一个消息
             * 1.发送到哪个交换机
             * 2.路由的KEY值是哪个? 指的是本次队列的名称
             * 3.其他参数信息
             * 4.发送的消息体
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("消息发送完毕");
        }
        channel.close();
        connection.close();
    }
}
	/**
 * 消费者:接收消息
 */
public class ConsumerWorkQueues1 {
    //队列名称,接收此队列的消息
    public static final String QUEUE_NAME="hello";

    public static void main(String[] args) throws Exception{
        //创建连接工程
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP连接rabbitMQ队列
        factory.setHost("ip地址");
        //设置用户名密码
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        //创建连接
        Connection connection = factory.newConnection();
        //通过连接来获取 信道来收消息
        Channel channel = connection.createChannel();
        //声明 接收消息的回调
        DeliverCallback deliverCallback=(consumerTag, message)-> {
            //message:包含消息头和消息体,我们只想拿到消息体
            //若不进行转换,直接输出message我们拿到的则是地址
            String data = new String(message.getBody());
            System.out.println(new String(message.getBody()));
        };
        //声明 取消消费的回调
        CancelCallback cancelCallback=consumerTag->{
            System.out.println("消费消息被中断");
        };
        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动答应
         *   true:代表自动应答
         *   false:手动应答
         * 3.消费成者成功消费的回调
         * 4.消费者取消消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}
	/**
 * 消费者:接收消息
 */
public class ConsumerWorkQueues2 {
    //队列名称,接收此队列的消息
    public static final String QUEUE_NAME="hello";

    public static void main(String[] args) throws Exception{
        //创建连接工程
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP连接rabbitMQ队列
       	factory.setHost("ip地址");
        //设置用户名密码
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        //创建连接
        Connection connection = factory.newConnection();
        //通过连接来获取 信道来收消息
        Channel channel = connection.createChannel();
        //声明 接收消息的回调
        DeliverCallback deliverCallback=(consumerTag, message)-> {
            //message:包含消息头和消息体,我们只想拿到消息体
            //若不进行转换,直接输出message我们拿到的则是地址
            String data = new String(message.getBody());
            System.out.println(new String(message.getBody()));
        };
        //声明 取消消费的回调
        CancelCallback cancelCallback=consumerTag->{
            System.out.println("消费消息被中断");
        };
        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动答应
         *   true:代表自动应答
         *   false:手动应答
         * 3.消费成者成功消费的回调
         * 4.消费者取消消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

    }
}

2.7 参数细节

	//消费者生成的队列
	channel.queueDeclare(QUEUE_NAME,(durable)true/false,false,false,null);
	//MessageProperties.PERSISTENT_TEXT_PLAIN:将消息进行持久化
	channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());  
	channel.queueDeclare(QUEUE_NAME,false,false,(autoDelete的参数位置)false,null);
	若开启了自动应答,rabbitMQ消息队列分配给消费者10个数据,只要消费者拿到消息队列的数据时,就会告诉消息队列,数据处理完毕。
	若当我们处理到第5个数据时,消费者出现了宕机,死掉了,则会出现数据丢失
	channel.basicConsume(QUEUE_NAME,(autoAck是否自动应答)false,deliverCallback,cancelCallback);

2.8 实现能者多劳

	/**
 * 消费者:接收消息
 */
public class ConsumerWorkQueues1 {
    //队列名称,接收此队列的消息
    public static final String QUEUE_NAME="hello";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip地址");
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //开启不公平分发,能者多劳
        channel.basicQos(1);
        DeliverCallback deliverCallback=(consumerTag, message)-> {
            String data = new String(message.getBody());
            System.out.println(new String(message.getBody()));
            //参数1:确认队列中那个具体的消息:
            	// 可以获取消息的id 
            	// 消息routingkey
            	// 交换机 exchange
            	// 消息和重传标志
            //参数2:是否开启多个消息同时确认
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback=consumerTag->{
            System.out.println("消费消息被中断");
        };
        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动答应
         *   true:代表自动应答
         *   false:手动应答
         * 3.消费成者成功消费的回调
         * 4.消费者取消消费的回调
         */
        //关闭自动应答 false
        channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
    }
}
public class ConsumerWorkQueues2 {
    //队列名称,接收此队列的消息
    public static final String QUEUE_NAME="hello";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip地址");
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //开启不公平分发,能者多劳
        channel.basicQos(1);
        //声明 接收消息的回调
        DeliverCallback deliverCallback=(consumerTag, message)-> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(new String(message.getBody()));
            //手动确认消息:
            //参数1:确认队列中那个具体的消息 参数2:是否开启多个消息同时确认
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        //声明 取消消费的回调
        CancelCallback cancelCallback=consumerTag->{
            System.out.println("消费消息被中断");
        };
        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动答应
         *   true:代表自动应答
         *   false:手动应答
         * 3.消费成者成功消费的回调
         * 4.消费者取消消费的回调
         */
        //关闭自动应答 false
        channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
    }

2.8.1 Ack手动应答防止数据丢失和消息拒收后重新发送

  1. 生产者
public class ProducerWorkQueue {
    //队列名称
    public static final String QUEUE_NAME="hello";
    //发消息
    public static void main(String[] args) throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip地址");
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        Scanner scanner = new Scanner(System.in);
        while (true){
            String msg = scanner.nextLine();
            channel.basicPublish("",QUEUE_NAME, null,msg.getBytes());
            System.out.println("消息发送完毕");
        }
    }
}
  1. 消费者a
public class ConsumerWorkQueues1 {
    //队列名称,接收此队列的消息
    public static final String QUEUE_NAME="hello";

    public static void main(String[] args) throws Exception{
        //创建连接工程
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip地址");
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        //创建连接
        Connection connection = factory.newConnection();
        //通过连接来获取 信道来收消息
        Channel channel = connection.createChannel();
        //声明 接收消息的回调
        DeliverCallback deliverCallback=(consumerTag, message)-> {
            //message:包含消息头和消息体,我们只想拿到消息体
            //若不进行转换,直接输出message我们拿到的则是地址
            String data = new String(message.getBody());
            System.out.println("消费者1===>"+new String(message.getBody()));
             try {
               int i=3/0;//模拟业务发生异常
               channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            }catch (Exception e){
                System.out.println("拒收消息发生了异常");
                //拒收消息
                    //参数一:表示投递的消息标签
                    //参数二:是否开启多个消息同时确认
                    //参数三:是否重新给队列发送
               channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);
            }
        };
        //声明 取消消费的回调
        CancelCallback cancelCallback=consumerTag->{
            System.out.println("消费消息被中断");
        };
        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动答应
         *   true:代表自动应答
         *   false:手动应答
         * 3.消费成者成功消费的回调
         * 4.消费者取消消费的回调
         */
        //关闭自动应答 false
        channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
    }
}
  1. 消费者b
public class ConsumerWorkQueues2 {
    //队列名称,接收此队列的消息
    public static final String QUEUE_NAME="hello";

    public static void main(String[] args) throws Exception{
        //创建连接工程
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip地址");
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        //创建连接
        Connection connection = factory.newConnection();
        //通过连接来获取 信道来收消息
        Channel channel = connection.createChannel();
        //声明 接收消息的回调
        DeliverCallback deliverCallback=(consumerTag, message)-> {
            //message:包含消息头和消息体,我们只想拿到消息体
            //若不进行转换,直接输出message我们拿到的则是地址
            System.out.println("睡10秒");
            try {
                Thread.sleep(1000*10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(new String(message.getBody()));
            //手动确认消息:
            //参数1:确认队列中那个具体的消息 参数2:是否开启多个消息同时确认
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        //声明 取消消费的回调
        CancelCallback cancelCallback=consumerTag->{
            System.out.println("消费消息被中断");
        };
        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动答应
         *   true:代表自动应答
         *   false:手动应答
         * 3.消费成者成功消费的回调
         * 4.消费者取消消费的回调
         */
        //关闭自动应答 false
        channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);

    }
}
  1. 发送 aa 消费者a接收
    在这里插入图片描述

在这里插入图片描述

  1. 发送bb消费者b接收,在消费者b睡眠过程中我们停止消费者b,来看看手动应答的结果
    在这里插入图片描述
    此时我们查看消费者a,出现了本应该是消费者b消费的消息bb
    在这里插入图片描述

2.8.2 预取值

	channel.basicQos(1);  0:轮询机制  1:能者多劳 若值>1代表当前队列的预取值,代表当前队列大概会拿到多少值

2.9 Publish/Subscribe 发布/订阅

在这里插入图片描述

	public class Provider {
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip地址");
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //将通道声明指定交换机,   参数一:交换机名称  参数二:交换机类型 fanout广播类型	
        //参数2:交换机类型也可使用  BuiltinExchangeType. 的方式来查看选择
        channel.exchangeDeclare("order", "fanout");
        channel.basicPublish("order","",null,"fanout type message".getBytes());
        channel.close();
        connection.close();
    }
}
	public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip地址");
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //通道绑定交换机
        channel.exchangeDeclare("order","fanout");
        //获取临时队列名称
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和队列
        channel.queueBind(queueName,"order","");
        channel.basicConsume(queueName,true,(consumerTag,message)->{
            System.out.println("消费者1===>"+new String(message.getBody()));
        },consumerTag -> System.out.println("取消消费消息"));
    }
}

2.10 Routing(路由) - Direct

在这里插入图片描述

routing值订阅模型-Direct(直连)

public class Provider {
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip地址");
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //通过信道声明交换机, 参数一:交换机名称  参数二:direct 路由模式
        channel.exchangeDeclare("logsExchange","direct");
        //发送消息 参数一:发送信息到的交换机名称
        //       参数二:绑定路由 发送给队列的那个路由key,
        //只有当队列的路由key与交换机的路由key相对应时,队列才会接受到消息
        channel.basicPublish("logsExchange","msgRouting",null,"routing logs direct info 发送了消息".getBytes());
        channel.close();
        connection.close();
    }
}
public class Consumer1 {
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip地址");
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("logs","direct");
        //获取临时队列名
        String queueName = channel.queueDeclare().getQueue();
        //绑定队列:参数一:临时队列名称 参数二:绑定的交换机名称 参数三:路由key,若消费者的路由key与生产者的路由key相同则可以收到消息
        channel.queueBind(queueName,"logsExchange","infoRouting");
        channel.queueBind(queueName,"logsExchange","msgRouting");
        channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));
    }
}
public class Consumer2 {
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip地址");
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("logs","direct");
        //获取临时队列名
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName,"logs","error");
        channel.queueBind(queueName,"logs","msg");
        channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));
    }
}

2.11 Routing(路由)- Topic

在这里插入图片描述

	#通配符
	* (star) can substitute for exactly one word :匹配一个词
	# (hash) can substitute for zero or more words :匹配一个或多个词
public class Provider {
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip地址");
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //通过信道声明交换机, 参数一:交换机名称  参数二:topic 动态路由
        channel.exchangeDeclare("order","topic");
        String routingKey="user.order";
        //发送消息 参数一:发送信息到的交换机名称  参数二:绑定路由 发送给队列的那个路由key
        channel.basicPublish("order",routingKey,null,("routing logs topic发送了消息"+routingKey).getBytes());
        channel.close();
        connection.close();
    }
}
public class Consumer1 {
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip地址");
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("order","topic");
        //获取临时队列名
        String queueName = channel.queueDeclare().getQueue();
        //绑定队列:参数一:临时队列名称 参数二:绑定的交换机名称 参数三:动态通配符路由key
        channel.queueBind(queueName,"order","user.*");
        channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));
    }
}
public class Consumer2 {
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip地址");
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("order","topic");
        //获取临时队列名
        String queueName = channel.queueDeclare().getQueue();
        //绑定队列:参数一:临时队列名称 参数二:绑定的交换机名称 参数三:动态通配符路由key
        channel.queueBind(queueName,"order","user.#");
        channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));
    }
}

三、进阶篇 高级特性

3.1 死信队列

	死信,顾名思义就是无法被消费的信息,字面意思可以这样理解,一般来说,producer将消息投递到queue里,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,自然就有了死信队列
	为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
public class TTLProvider {
    //普通交换机名称
    public static final String NORMAL_EXCHANGE="normal_exchange";
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip");
        factory.setUsername("账户");
        factory.setPassword("密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //发送死信 设置TTL过期时间
        AMQP.BasicProperties properties=new AMQP.BasicProperties().builder().expiration("10000").build();
        for (int i = 1; i <= 10; i++) {
            String msg=""+i;
            channel.basicPublish(NORMAL_EXCHANGE,"normal",properties,msg.getBytes());
        }
        System.out.println("结束发送");
    }
}
public class TTLConsumer1 {
    //普通交换机名称
    public static final String NORMAL_EXCHANGE="normal_exchange";
    //死信交换机名称
    public static final String DEAD_EXCHANGE="dead_exchange";
    //普通队列名称
    public static final String NORMAL_QUEUE="normal_queue";
    //死信队列名称
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip");
        factory.setUsername("账户");
        factory.setPassword("密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明普通交换机和死信交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
        channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
        //声明普通队列
        HashMap<String, Object> map = new HashMap<>();
        //当消息被拒绝接受/未被消费 会将消息转发到死信队列
        //正常队列设置死信交换机
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信队列的routingKey
        map.put("x-dead-letter-routing-key","dead");
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //绑定普通交换机与普通队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");
        //绑定死信交换机与死信队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");
        DeliverCallback deliverCallback=( consumerTag, message)->{
            System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));
        };
        CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);
        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);
    }
}
public class TTLConsumer2 {
    //死信队列名称
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip");
        factory.setUsername("账户");
        factory.setPassword("密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        DeliverCallback deliverCallback=( consumerTag, message)->{
            System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));
        };
        CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
    }
}

3.1.1 死信队列实战:消息TTL过期

在这里插入图片描述

@Configuration
public class RabbitMQConfiguration {
    //普通交换机
    public static final String X_EXCHANGE="X";
    //死信交换机
    public static final String Y_DEAD_LETTER_EXCHANGE="Y";
    //普通队列
    public static final String QUEUE_A="QA";
    public static final String QUEUE_B="QB";
    //死信队列
    public static final String DEAD_QUEUE_D="QD";
    //声明普通x交换机
    @Bean
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }
    //声明死信交换机
    @Bean
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //声明普通队列A TTL:10S
    @Bean
    public Queue queueA(){
        Map<String,Object> arg=new HashMap<>();
        //设置死信交换机
        arg.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信routingKey
        arg.put("x-dead-letter-routing-key","YD");
        //设置TTL过期时间
        arg.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arg).build();
    }
    //声明普通队列B TTL:40S
    @Bean
    public Queue queueB(){
        Map<String,Object> arg=new HashMap<>();
        //设置死信交换机
        arg.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信routingKey
        arg.put("x-dead-letter-routing-key","YD");
        //设置TTL过期时间
        arg.put("x-message-ttl",40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(arg).build();
    }
    //死信队列
    @Bean
    public Queue queueD(){
        return QueueBuilder.durable(DEAD_QUEUE_D).build();
    }

    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }
    @Bean
    public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}
@RestController
@RequestMapping("/ttl")
@Slf4j
public class TTLProvider {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/{msg}")
    public void sendMsg(@PathVariable("msg") String msg){
        log.info("当前发送时间:{}发送了一条消息",new Date().toString());
        rabbitTemplate.convertAndSend("X","XA","TTL消息延迟为10S,消息为===>"+msg);
        rabbitTemplate.convertAndSend("X","XB","TTL消息延迟为40S,消息为===>"+msg);
    }
}
@Component
@Slf4j
public class DeadLetterConsumer {
    @RabbitListener(queues = "QD")
    public void t1(Message message, Channel channel)throws Exception{
        log.info("收到死信队列的消息{},时间为{}",new String(message.getBody(),"UTF-8"),new Date().toString());
    }
}

3.1.2 死信队列实战:队列达到最大长度 设置正常队列最大长度

  1. 生产者
public class Producer {
    //普通交换机名称
    public static final String NORMAL_EXCHANGE="normal_exchange";
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip地址");
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        for (int i = 1; i <= 10; i++) {
            String msg=""+i;
            channel.basicPublish(NORMAL_EXCHANGE,"normal",null,msg.getBytes());
        }
    }
}
  1. 消费者a
    //设置当前正常队列的长度限制超过长度,后面的消息会进入到死信队列
    map.put(“x-max-length”,6);
public class Consumer01 {
    //普通交换机名称
    public static final String NORMAL_EXCHANGE="normal_exchange";
    //死信交换机名称
    public static final String DEAD_EXCHANGE="dead_exchange";
    //普通队列名称
    public static final String NORMAL_QUEUE="normal_queue";
    //死信队列名称
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip地址");
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明普通交换机和死信交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
        channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
        //声明普通队列
        HashMap<String, Object> map = new HashMap<>();
        //正常队列设置死信交换机
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信队列的routingKey
        map.put("x-dead-letter-routing-key","dead");
        //设置当前正常队列的长度限制超过长度,后面的消息会进入到死信队列
        map.put("x-max-length",6);
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //绑定普通交换机与普通队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");
        //绑定死信交换机与死信队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");
        DeliverCallback deliverCallback=( consumerTag, message)->{
            System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));
        };
        CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);
        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);
    }
}
  1. 消费者b
public class Consumer02 {
    //死信队列名称
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip地址");
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        DeliverCallback deliverCallback=( consumerTag, message)->{
            System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));
        };
        CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
    }
}

3.1.3 死信队列实战:消息被拒

  1. 生产者
public class Producer {
    //普通交换机名称
    public static final String NORMAL_EXCHANGE="normal_exchange";
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip地址");
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        for (int i = 1; i <= 10; i++) {
            String msg="info"+i;
            channel.basicPublish(NORMAL_EXCHANGE,"normal",null,msg.getBytes());
        }
    }
}
  1. 消费者a
public class Consumer01 {
    //普通交换机名称
    public static final String NORMAL_EXCHANGE="normal_exchange";
    //死信交换机名称
    public static final String DEAD_EXCHANGE="dead_exchange";
    //普通队列名称
    public static final String NORMAL_QUEUE="normal_queue";
    //死信队列名称
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip");
        factory.setUsername("登录账户");
        factory.setPassword("登录密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明普通交换机和死信交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
        channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
        //声明普通队列
        HashMap<String, Object> map = new HashMap<>();
        //正常队列设置死信交换机
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信队列的routingKey
        map.put("x-dead-letter-routing-key","dead");
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //绑定普通交换机与普通队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");
        //绑定死信交换机与死信队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");
        DeliverCallback deliverCallback=( consumerTag, message)->{
            String msg=new String(message.getBody());
            if("info5".equals(msg)){
                System.out.println("Consumer1接收消息===>"+msg+"此消息被拒绝");
                //此消息被拒接,是否重新放回正常队列, false:不放回 则会放到死信队列
                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
            }else {
                System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            }
        };
        CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);
        //开启手动应答
        channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);
    }
}
  1. 消费者b
public class Consumer02 {
    //死信队列名称
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip地址");
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        DeliverCallback deliverCallback=( consumerTag, message)->{
            System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));
        };
        CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
    }
}

3.2 基于SpringBoot实现延迟队列

在这里插入图片描述

  1. 配置队列交换机
@Configuration
public class QueueConfig {
    @Bean("exchange")
    public DirectExchange exchange(){
        return new DirectExchange("msg");
    }
    @Bean("simpleQue")
    public Queue simpleQue(){
        HashMap<String, Object> map = new HashMap<>();
        //设置死信交换机
        map.put("x-dead-letter-exchange","dead");
        //设置死信路由
        map.put("x-dead-letter-routing-key","deadKey");
        //消息失效时间
        map.put("x-message-ttl",10000);
        return new Queue("simple",false,false,false,map);
    }
    @Bean
    public Binding simpleQueueBandingExchange(@Qualifier("simpleQue") Queue simple,@Qualifier("exchange") DirectExchange msg)throws Exception{
        return BindingBuilder.bind(simple).to(msg).with("info");
    }
    @Bean("deadExchange")
    public DirectExchange exchange1(){
        return new DirectExchange("dead");
    }
    @Bean("deadQueue")
    public Queue deadQ(){
        return new Queue("deadQue",false,false,false,null);
    }
    @Bean
    public Binding deadKeyBindingDeadExchange(@Qualifier("deadQueue")Queue queue,@Qualifier("deadExchange")DirectExchange dead){
        //绑定死信队列到死信交换机通过路由
        return BindingBuilder.bind(queue).to(dead).with("deadKey");
    }
}
  1. 生产者
@RestController
public class Provider {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("/ttl/{message}")
    public void t1(@PathVariable String message){
        String queueName="simple";
        Date date = new Date();
        System.out.println(date);
            rabbitTemplate.convertAndSend("msg","info",message);
    }
}
  1. 消费者
@Component
public class Consumer {
    @RabbitListener(queues = "deadQue")
    public void hello(Message msg, Channel channel)throws Exception{
        System.out.println("接收到消息"+new String(msg.getBody()));
        Date date1 = new Date();
        System.out.println(date1);
    }
}

3.3 发布确认 高级特性

3.3.1 可靠性投递confirm模式

  1. 配置类
@Component
public class confirmConfig {
    public static final String CONFIRM_EXCHANGE_NAME="confirm.exchange";
    public static final String CONFIRM_QUEUE="confirm.queue";
    public static final String CONFIRM_ROUTING_KEY="confirm";
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return new Queue(CONFIRM_QUEUE);
    }
    @Bean
    public Binding confirmQueueBindingConfirmExchange(@Qualifier("confirmExchange")DirectExchange confirmExchange,@Qualifier("confirmQueue")Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }
}
  1. 实现回调接口:实现 RabbitTemplate.ConfirmCallback接口的confirm方法并且将其注入到rabbit模板的内部类中
@Component
@Slf4j
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //注入
    @PostConstruct //当所有注解执行完后,再执行这个注解
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
    }
    /**
     * 交换机确认回调方法
     *  发消息,交换机接收到了,回调
     *  参数
     *      1. correlationData:保存消息的ID及相关信息,这个消息是我们生产者手动传入的
     *      2. 交换机收到消息 true
     *      3. null
     */
    /**
     * 交换机确认回调方法
     *  发消息,交换机接收失败,回调
     *  参数
     *      1. correlationData:保存消息的ID及相关信息
     *      2. 交换机收到消息 false
     *      3. cause:失败的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id=correlationData!=null?correlationData.getId():"";
        if(b){
            log.info("交换机已经收到了ID为{}的消息",id);
        }else {
            log.info("交换机为收到了ID为{}的消息,原因是:{}",id,s);
        }
    }
}
  1. 生产者
@RestController
public class ConfirmProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/sendMsg/{msg}")
    public void t1(@PathVariable String msg){
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY,"嘿嘿嘿".getBytes(),correlationData);
    }
}
  1. 消费者
@Component
public class ConfirmConsumer {
    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE)
    public void consumer(Message message){
        System.out.println("高级特性确认发布消费者收到了消息===>"+new String(message.getBody()));
    }
}

3.3.2 可靠性投递return模式

@Component
@Slf4j
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //注入
    @PostConstruct //当所有注解执行完后,再执行这个注解
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }
    /**
     * 交换机确认回调方法
     *  发消息,交换机接收到了,回调
     *  参数
     *      1. correlationData:保存消息的ID及相关信息
     *      2. 交换机收到消息 true
     *      3. null
     */
    /**
     * 交换机确认回调方法
     *  发消息,交换机接收失败,回调
     *  参数
     *      1. correlationData:保存消息的ID及相关信息
     *      2. 交换机收到消息 false
     *      3. cause:失败的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id=correlationData!=null?correlationData.getId():"";
        if(b){
            log.info("交换机已经收到了ID为{}的消息",id);
        }else {
            log.info("交换机未收到了ID为{}的消息,原因是:{}",id,s);
        }
    }

    /**
     * 消息传递过程中 不可达 消费者的队列时将消息返回给生产者
     * 只有当消息 不可达 目的地的时候 才进行回调
     * 参数1:消息体
     * 参数2:回复代码
     * 参数3:回复原因
     * 参数4:交换机
     * 参数5:路由key
     */
    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        log.info("消息{},被交换机{}退回,原因是{},路由是{}",new String(message.getBody()),s1,s,s2);
    }

}

3.4 优先级队列

  1. 生产者
public class PriorityProducer {
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip地址");
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //设置优先级参数
        AMQP.BasicProperties build = new AMQP.BasicProperties.Builder().priority(10).build();
        for (int i = 1; i <= 10; i++) {
            String msg="info"+i;
            if(i==5){
                channel.basicPublish("","hi",build,msg.getBytes());
            }else {
                channel.basicPublish("","hi",null,msg.getBytes());
            }
        }
    }
}
  1. 消费者
public class PriorityConsumer {
    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip地址");
        factory.setUsername("RabbitMQ登录用户名");
        factory.setPassword("RabbitMQ登录密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        HashMap<String, Object> map = new HashMap<>();
        //设置当前队列为优先级队列
        map.put("x-max-priority",10);
        channel.queueDeclare("hi",false,false,false,map);
        channel.basicConsume("hi",true,(consumerTag,message)->{
            System.out.println("优先级队列接收消息顺序===>"+new String(message.getBody()));
        },(consumerTag) -> System.out.println("取消回调"));
    }
}

3.5 消费端限流

	//设置每次确定一个消息
	channel.basicQos(0,1,false);
12
public class AckProvider {
    //队列名称
    public static final String QUEUE_NAME="hello_Ack";
    //发消息
    public static void main(String[] args) throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip");
        factory.setUsername("用户");
        factory.setPassword("密码");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        Scanner scanner = new Scanner(System.in);
        while (true){
            String msg = scanner.nextLine();
            channel.basicPublish("",QUEUE_NAME, null,msg.getBytes());
            System.out.println("消息发送完毕");
        }
    }
}
public class AckConsumer2 {
    //队列名称,接收此队列的消息
    public static final String QUEUE_NAME="hello_Ack";

    public static void main(String[] args) throws Exception{
        //创建连接工程
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip");
        factory.setUsername("用户");
        factory.setPassword("密码");
        //创建连接
        Connection connection = factory.newConnection();
        //通过连接来获取 信道来收消息
        Channel channel = connection.createChannel();
        //声明 接收消息的回调
        DeliverCallback deliverCallback=(consumerTag, message)-> {
            //message:包含消息头和消息体,我们只想拿到消息体
            //若不进行转换,直接输出message我们拿到的则是地址
            System.out.println(new String(message.getBody()));
            try {
                Thread.sleep(1000*5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //手动确认消息:
            //参数1:确认队列中那个具体的消息 参数2:是否开启多个消息同时确认
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

        };
        //声明 取消消费的回调
        CancelCallback cancelCallback=consumerTag->{
            System.out.println("消费消息被中断");
        };
        //每次只消费一个
		channel.basicQos(0,1,false);
		 /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动答应
         *   true:代表自动应答
         *   false:手动应答
         * 3.消费成者成功消费的回调
         * 4.消费者取消消费的回调
         */
        //关闭自动应答 false
        channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);

    }
}

标签:教程,String,队列,factory,RabbitMQ,保姆,消息,public,channel
来源: https://blog.csdn.net/qq_43842093/article/details/121319307