其他分享
首页 > 其他分享> > RabbitMQ(点对点直连 and 参数说明)

RabbitMQ(点对点直连 and 参数说明)

作者:互联网

核心依赖:

<!--引入rabbitmq相关依赖-->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.2</version>
</dependency>

第一种模型:直连

 

 在上图的模型中,有以下概念:

P:生产者,也就是要发送消息的程序

C:消费者:消息的接受者,会一直等待消息到来。

queue:消息队列,图中红色部分。类似一个邮箱, 可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

生产者:

package com.eddie.helloworld;

import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Provider {

    //生产消息
    @Test
    public void testSendMessage() throws IOException, TimeoutException {

//        //创建连接mq的连接工厂对象
//        ConnectionFactory connectionFactory = new ConnectionFactory();
//        //设置连接rabbitmq主机
//        connectionFactory.setHost("192.168.2.2");
//        //设置端口号
//        connectionFactory.setPort(5672);
//        //设置连接那个虚拟主机
//        connectionFactory.setVirtualHost("/ems");
//        //设置访问虚拟主机的用户名和密码
//        connectionFactory.setUsername("ems");
//        connectionFactory.setPassword("123");
//
//        //获取连接对象
//        Connection connection = connectionFactory.newConnection();

        //通过工具类获取连接对象
        Connection connection = RabbitMQUtils.getConnection();

        //获取连接中通道
        Channel channel = connection.createChannel();

        //通道绑定对应消息队列
        //参数1:  队列名称 如果队列不存在自动创建
        //参数2:  用来定义队列特性是否要持久化 true 持久化队列   false 不持久化(服务重启后队列还保存,但不保存消息)
        //参数3:  exclusive 是否独占队列  true 独占队列   false  不独占
        //参数4:  autoDelete: 是否在消费完成后自动删除队列  true 自动删除  false 不自动删除
        //参数5:  额外附加参数
        channel.queueDeclare("hello",true,false,true,null);

        //发布消息
        //参数1: 交换机名称 参数2:队列名称  参数3:传递消息额外设置(如下参数保证服务重启后消息还存在,可继续消费)  参数4:消息的具体内容
        channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
        
//        channel.close();
//        connection.close();
        //调用工具类
        RabbitMQUtils.closeConnectionAndChanel(channel,connection);
    }
}

  

消费者:

package com.eddie.helloworld;

import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    //生产消息
    public static void main(String[] args) throws IOException, TimeoutException {

//        //创建连接mq的连接工厂对象
//        ConnectionFactory connectionFactory = new ConnectionFactory();
//        //设置连接rabbitmq主机
//        connectionFactory.setHost("192.168.2.2");
//        //设置端口号
//        connectionFactory.setPort(5672);
//        //设置连接那个虚拟主机
//        connectionFactory.setVirtualHost("/ems");
//        //设置访问虚拟主机的用户名和密码
//        connectionFactory.setUsername("ems");
//        connectionFactory.setPassword("123");
//
//        //获取连接对象
//        Connection connection = connectionFactory.newConnection();
//
        //通过工具类获取连接对象
        Connection connection = RabbitMQUtils.getConnection();

        //获取连接中通道
        Channel channel = connection.createChannel();

        //通道绑定对应消息队列
        //参数1:  队列名称 如果队列不存在自动创建
        //参数2:  用来定义队列特性是否要持久化 true 持久化队列   false 不持久化
        //参数3:  exclusive 是否独占队列  true 独占队列   false  不独占
        //参数4:  autoDelete: 是否在消费完成后自动删除队列  true 自动删除  false 不自动删除
        //参数5:  额外附加参数
        channel.queueDeclare("hello",true,false,true,null);


        channel.basicConsume("hello", true, new DefaultConsumer(channel){
            //最后一个参数:消息队列取出的消息
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("new String(body): " + new String(body));
            }
        });

//        channel.close();
//        connection.close();
    }
}

封装的公共类:

package com.eddie.utiils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQUtils {
    private static ConnectionFactory connectionFactory;
    //private static Properties properties;
    static{
        //重量级资源  类加载执行之执行一次
        //创建连接mq的连接工厂对象
         connectionFactory = new ConnectionFactory();
        //设置连接rabbitmq主机
        connectionFactory.setHost("192.168.2.2");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置连接那个虚拟主机
        connectionFactory.setVirtualHost("/ems");
        //设置访问虚拟主机的用户名和密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("123");

    }

    //定义提供连接对象的方法
    public static Connection getConnection() {
        try {
            return connectionFactory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    //关闭通道和关闭连接工具方法
    public static void closeConnectionAndChanel(Channel channel, Connection conn) {
        try {
            if(channel!=null) channel.close();
            if(conn!=null)   conn.close();
        } catch (Exception e) {
            e.printStackTrace();

        }
    }
}

第二种模型(work quene)

Work queues,也被称为(Task queues),任务模型。
当消息处理比较耗时的时候,可能生产消息的速度会
远远大于消息的消费速度。长此以往,消息就会堆积
越来越多,无法及时处理。此时就可以使用work 模型:
让多个消费者绑定到一个队列,共同消费队列中的消息。
队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

 角色:

P:生产者:任务的发布者

C1:消费者-1,领取任务并且完成任务,假设完成速度较慢

C2:消费者-2:领取任务并完成任务,假设完成速度快

 

总结:默认情况下,RabbitMQ将按顺序将每个消息发送给 下一个使用者。平均而言,每个消费者都会收到相同数量 的消息。这种分发消息的方式称为循环。

假如生产者发送了10条消息,每个消费者将获取5条数据,其中一个消费者读取到第三条的时候发生宕机,则剩下的消息就会丢失,这样我们就需要关闭rabbitmq自动确认机制,手动去给一个确认标识,即可以解决宕机消息丢失问题,又可以产生能者多劳的效果

生产者:

package com.eddie.workqueues;

import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Provider2 {
    //生产消息
    @Test
    public void testSendMessage() throws IOException, TimeoutException {

        //通过工具类获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        //获取连接中通道
        Channel channel = connection.createChannel();

        //通道绑定对应消息队列
        //参数1:  队列名称 如果队列不存在自动创建
        //参数2:  用来定义队列特性是否要持久化 true 持久化队列   false 不持久化(服务重启后队列还保存,但不保存消息)
        //参数3:  exclusive 是否独占队列  true 独占队列   false  不独占
        //参数4:  autoDelete: 是否在消费完成后自动删除队列  true 自动删除  false 不自动删除
        //参数5:  额外附加参数
        channel.queueDeclare("hello",true,false,true,null);

        //发布消息
        //参数1: 交换机名称 参数2:队列名称  参数3:传递消息额外设置(如下参数保证服务重启后消息还存在,可继续消费)  参数4:消息的具体内容
        for(int i = 0; i<20; i++){
            channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,(i+"hello rabbitmq").getBytes());
        }
        //调用工具类
        RabbitMQUtils.closeConnectionAndChanel(channel,connection);
    }
}

消费者1:

package com.eddie.workqueues;

import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {
    public static void main(String[] args) throws IOException {
        //通过工具类获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        //获取连接中通道
        Channel channel = connection.createChannel();
        channel.basicQos(1); //每次只能消费一条消息
        channel.queueDeclare("hello",true,false,true,null);

        //参数1:队列名称    参数2:消息确认 true消费者自动向rabbitmq确认消费 false不会自动确认
        channel.basicConsume("hello", false, new DefaultConsumer(channel){
            //最后一个参数:消息队列取出的消息
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws
                    IOException {
                System.out.println("111new String(body): " + new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }

}

消费者2:

package com.eddie.workqueues;

import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

import static java.lang.Thread.sleep;

public class Consumer2 {
    public static void main(String[] args) throws IOException {
        //通过工具类获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        //获取连接中通道
        Channel channel = connection.createChannel();
        channel.basicQos(1); //每次只能消费一条消息
        channel.queueDeclare("hello",true,false,true,null);
        //参数1:队列名称    参数2:消息确认 true消费者自动向rabbitmq确认消费 false不会自动确认
        channel.basicConsume("hello", false, new DefaultConsumer(channel){
            //最后一个参数:消息队列取出的消息
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws
                    IOException {
                try {
                    sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("222new String(body): " + new String(body));
                //手动确认  参数1:手动确认消息标识   参数2:是否开启多个消息同时确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }

}

设置通道一次只能消费一个消息 关闭消息的自动确认,开启手动确认消息

标签:直连,connectionFactory,队列,点对点,RabbitMQ,参数,import,com,channel
来源: https://blog.csdn.net/qq_41835151/article/details/120497215