四、RabbitMQ消息发送和接收
作者:互联网
1、Java发送和接收Queue的消息
1.1 创建Maven工程01-rabbitmq-send-java
添加Maven依赖
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.1.1</version> </dependency> </dependencies>
1.2 编写消息发送类
1)生产者连接到RabbitMQ Broker,建立一个连接( Connection)开启一个信道(Channel)
2)生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等
3)生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
4)生产者通过路由键将交换器和队列绑定起来
5)生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息。
6)相应的交换器根据接收到的路由键查找相匹配的队列。
7)如果找到,则将从生产者发送过来的消息存入相应的队列中。
8)如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
9)关闭信道。
10)关闭连接。
package com.taiping.queue; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { public static ConnectionFactory getConnectionFactory(){ ConnectionFactory connectionFactory = new ConnectionFactory(); //设置RabbitMQ的主机IP connectionFactory.setHost("10.6.36.49"); //设置RabbitMQ的端口号 //在使用浏览器可以访问,但是用程序就访问不到了。是因为浏览器访问的端口是15672。 //但是用java程序连接,端口就变成了5672。 connectionFactory.setPort(5672); //设置访问用户名 connectionFactory.setUsername("root"); //设置访问密码 connectionFactory.setPassword("root"); return connectionFactory; } public static void main(String[] args) throws IOException, TimeoutException { //获取链接工厂对象 ConnectionFactory connectionFactory = getConnectionFactory(); //实例化链接对象 Connection connection = connectionFactory.newConnection(); //实例化通道对象 Channel channel=connection.createChannel(); String message ="老铁666~~~"; //声明队列 channel.queueDeclare("01_Queue", true, false, false, null); //发送消息到指定队列 channel.basicPublish("","01_Queue",null,message.getBytes("UTF-8")); System.out.println("消息发送成功: "+message); channel.close(); connection.close(); } }
1.3 创建Maven工程01-rabbitmq-receive-java
添加Maven依赖
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.1.1</version> </dependency> </dependencies>
声明队列属性介绍
channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments)
注意:
a)声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;
b)队列名可以任意取值,但需要与消息接收者一致。
1) queue: 队列的名称 ;
2)durable: 是否持久化 ;
- 当durable = false时,队列非持久化。因为队列是存放在内存中的,所以当RabbitMQ重启或者服务器重启时该队列就会丢失 ;
- 当durable = true时,队列持久化。当RabbitMQ重启后队列不会丢失。RabbitMQ退出时它会将队列信息保存到 Erlang自带的Mnesia数据库 中,当RabbitMQ重启之后会读取该数据库 ;
3) exclusive: 是否排外的 ;
- 当exclusive = true则设置队列为排他的。如果一个队列被声明为排他队列,该队列 仅对首次声明它的连接(Connection)可见,是该Connection私有的,类似于加锁,并在连接断开connection.close()时自动删除 ;
- 当exclusive = false则设置队列为非排他的,此时不同连接(Connection)的管道Channel可以使用该队列 ;
注意2点:
排他队列是 基于连接(Connection) 可见的,同个连接(Connection)的不同管道 (Channel) 是可以同时访问同一连接创建的排他队列 。其他连接是访问不了的 ,强制访问将报错:
以下声明是没问题的:
Channel channel = connection.createChannel(); Channel channel2 = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, true, false, null); channel2.queueDeclare(QUEUE_NAME, false, true, false, null); //如果是不同的 connection 创建的 channel 和 channel2,那么以上的 //channel2.queueDeclare()是会报错的!!!!!!
"首次" 是指如果某个连接(Connection)已经声明了排他队列,其他连接是不允许建立同名的排他队列的。这个与普通队列不同:即使该队列是持久化的(durable = true),一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
4) autoDelete: 是否自动删除 ;如果autoDelete = true,当所有消费者都与这个队列断开连接时,这个队列会自动删除。注意: 不是说该队列没有消费者连接时该队列就会自动删除,因为当生产者声明了该队列且没有消费者连接消费时,该队列是不会自动删除的。
5) arguments: 设置队列的其他一些参数,如 x-rnessage-ttl 、x-expires 、x-rnax-length 、x-rnax-length-bytes、 x-dead-letter-exchange、 x-deadletter-routing-key 、 x-rnax-priority 等。
1.4 编写消息接收类
1)消费者连接到RabbitMQ Broker ,建立一个连接(Connection),开启一个信道(Channel) 。
2)消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,
3)等待RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息。
4)消费者确认(ack) 接收到的消息。
5)RabbitMQ 从队列中删除相应己经被确认的消息。
6)关闭信道。
7)关闭连接package com.taiping.queueimport java.io.IOException;import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Receive { public static ConnectionFactory getConnectionFactory(){ ConnectionFactory connectionFactory = new ConnectionFactory(); //设置RabbitMQ的主机IP connectionFactory.setHost("10.6.36.49"); //设置访问用户名 connectionFactory.setUsername("root"); //设置访问密码 connectionFactory.setPassword("root"); return connectionFactory; } public static void main(String[] args) throws IOException, TimeoutException { //获取链接工厂对象 ConnectionFactory connectionFactory = getConnectionFactory(); //实例化链接对象 Connection connection = connectionFactory.newConnection(); //实例化信道对象 final Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare("01_Queue", true, false, false, null); //消费消息 boolean autoAck = true; String consumerTag = ""; //接收消息 //参数1 队列名称 //参数2 是否自动确认消息 true表示自动确认 false表示手动确认 //参数3 为消息标签 用来区分不同的消费者这里暂时为"" // 参数4 消费者回调方法用于编写处理消息的具体代码(例如打印或将消息写入数据库) channel.basicConsume("01_Queue", autoAck, consumerTag, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr); } }); //channel.close(); //connection.close();
//接收消息会持续监听,不能关闭channel和connection
} }
标签:connectionFactory,队列,RabbitMQ,发送,rabbitmq,接收,com,channel 来源: https://www.cnblogs.com/ljk-shm-0208/p/14662303.html