其他分享
首页 > 其他分享> > 四、RabbitMQ消息发送和接收

四、RabbitMQ消息发送和接收

作者:互联网

1、Java发送和接收Queue的消息

1.1 创建Maven工程01-rabbitmq-send-java

添加Maven依赖

<dependencies>
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.1.1</version>
  </dependency>
</dependencies>

1.2 编写消息发送类

1)生产者连接到RabbitMQ Broker,建立一个连接( Connection)开启一个信道(Channel)

2)生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等

3)生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等

4)生产者通过路由键将交换器和队列绑定起来

5)生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息。

6)相应的交换器根据接收到的路由键查找相匹配的队列。

7)如果找到,则将从生产者发送过来的消息存入相应的队列中。

8)如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者

9)关闭信道。

10)关闭连接。

package com.taiping.queue;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {
    public static ConnectionFactory getConnectionFactory(){
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置RabbitMQ的主机IP
        connectionFactory.setHost("10.6.36.49");
        //设置RabbitMQ的端口号
        //在使用浏览器可以访问,但是用程序就访问不到了。是因为浏览器访问的端口是15672。
        //但是用java程序连接,端口就变成了5672。
        connectionFactory.setPort(5672);
        //设置访问用户名
        connectionFactory.setUsername("root");
        //设置访问密码
        connectionFactory.setPassword("root");
        
        return connectionFactory;
        
    }
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取链接工厂对象
        ConnectionFactory connectionFactory = getConnectionFactory();
        
        //实例化链接对象
        Connection connection = connectionFactory.newConnection();
        //实例化通道对象
        Channel channel=connection.createChannel();
        
        String message ="老铁666~~~";
        
        //声明队列 
        channel.queueDeclare("01_Queue", true, false, false, null);
        //发送消息到指定队列
        channel.basicPublish("","01_Queue",null,message.getBytes("UTF-8"));
        
        System.out.println("消息发送成功: "+message);
        channel.close();
        connection.close();
    }
}

1.3 创建Maven工程01-rabbitmq-receive-java

添加Maven依赖

<dependencies>
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.1.1</version>
  </dependency>
</dependencies>

声明队列属性介绍

channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments)

 

注意:
a)声明队列时,如果已经存在则放弃声明,如果不存在则会声明一个新队列;
b)队列名可以任意取值,但需要与消息接收者一致。

 

1) queue: 队列的名称 ;

2)durable: 是否持久化 ;

3) exclusive: 是否排外的 ;

 

注意2点

排他队列是 基于连接(Connection) 可见的,同个连接(Connection)的不同管道 (Channel) 是可以同时访问同一连接创建的排他队列 。其他连接是访问不了的 ,强制访问将报错:

以下声明是没问题的:

 

Channel channel = connection.createChannel();
Channel channel2 = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, true, false, null);
channel2.queueDeclare(QUEUE_NAME, false, true, false, null);
//如果是不同的 connection 创建的 channel 和 channel2,那么以上的
//channel2.queueDeclare()是会报错的!!!!!!

"首次" 是指如果某个连接(Connection)已经声明了排他队列,其他连接是不允许建立同名的排他队列的。这个与普通队列不同:即使该队列是持久化的(durable = true),一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。

4) autoDelete: 是否自动删除 ;如果autoDelete = true,当所有消费者都与这个队列断开连接时,这个队列会自动删除。注意: 不是说该队列没有消费者连接时该队列就会自动删除,因为当生产者声明了该队列且没有消费者连接消费时,该队列是不会自动删除的。

5) arguments: 设置队列的其他一些参数,如 x-rnessage-ttl 、x-expires 、x-rnax-length 、x-rnax-length-bytes、 x-dead-letter-exchange、 x-deadletter-routing-key 、 x-rnax-priority 等。

1.4 编写消息接收类

1)消费者连接到RabbitMQ Broker ,建立一个连接(Connection),开启一个信道(Channel) 。

2)消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,

3)等待RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息。

4)消费者确认(ack) 接收到的消息。

5)RabbitMQ 从队列中删除相应己经被确认的消息。

6)关闭信道。

7)关闭连接package com.taiping.queueimport java.io.IOException;import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Receive {
    public static ConnectionFactory getConnectionFactory(){
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置RabbitMQ的主机IP
        connectionFactory.setHost("10.6.36.49");
        //设置访问用户名
        connectionFactory.setUsername("root");
        //设置访问密码
        connectionFactory.setPassword("root");
        
        return connectionFactory;
        
    }
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取链接工厂对象
        ConnectionFactory connectionFactory = getConnectionFactory();
        //实例化链接对象
        Connection connection = connectionFactory.newConnection();
        //实例化信道对象
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare("01_Queue", true, false, false, null);
        //消费消息
        boolean autoAck = true;
        String consumerTag = "";
        //接收消息
        //参数1 队列名称
        //参数2 是否自动确认消息 true表示自动确认 false表示手动确认
        //参数3 为消息标签 用来区分不同的消费者这里暂时为""
        // 参数4 消费者回调方法用于编写处理消息的具体代码(例如打印或将消息写入数据库)
        channel.basicConsume("01_Queue", autoAck, consumerTag, new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body) throws IOException {
                String bodyStr = new String(body, "UTF-8");
                System.out.println(bodyStr);
            }
         
        });
        //channel.close();
        //connection.close();
//接收消息会持续监听,不能关闭channel和connection
} }

 

标签:connectionFactory,队列,RabbitMQ,发送,rabbitmq,接收,com,channel
来源: https://www.cnblogs.com/ljk-shm-0208/p/14662303.html