RabbitMQ(点对点直连 and 参数说明)
作者:互联网
核心依赖:
<!--引入rabbitmq相关依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
第一种模型:直连
在上图的模型中,有以下概念:
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
queue:消息队列,图中红色部分。类似一个邮箱, 可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
生产者:
package com.eddie.helloworld;
import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Provider {
//生产消息
@Test
public void testSendMessage() throws IOException, TimeoutException {
// //创建连接mq的连接工厂对象
// ConnectionFactory connectionFactory = new ConnectionFactory();
// //设置连接rabbitmq主机
// connectionFactory.setHost("192.168.2.2");
// //设置端口号
// connectionFactory.setPort(5672);
// //设置连接那个虚拟主机
// connectionFactory.setVirtualHost("/ems");
// //设置访问虚拟主机的用户名和密码
// connectionFactory.setUsername("ems");
// connectionFactory.setPassword("123");
//
// //获取连接对象
// Connection connection = connectionFactory.newConnection();
//通过工具类获取连接对象
Connection connection = RabbitMQUtils.getConnection();
//获取连接中通道
Channel channel = connection.createChannel();
//通道绑定对应消息队列
//参数1: 队列名称 如果队列不存在自动创建
//参数2: 用来定义队列特性是否要持久化 true 持久化队列 false 不持久化(服务重启后队列还保存,但不保存消息)
//参数3: exclusive 是否独占队列 true 独占队列 false 不独占
//参数4: autoDelete: 是否在消费完成后自动删除队列 true 自动删除 false 不自动删除
//参数5: 额外附加参数
channel.queueDeclare("hello",true,false,true,null);
//发布消息
//参数1: 交换机名称 参数2:队列名称 参数3:传递消息额外设置(如下参数保证服务重启后消息还存在,可继续消费) 参数4:消息的具体内容
channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
// channel.close();
// connection.close();
//调用工具类
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
消费者:
package com.eddie.helloworld;
import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
//生产消息
public static void main(String[] args) throws IOException, TimeoutException {
// //创建连接mq的连接工厂对象
// ConnectionFactory connectionFactory = new ConnectionFactory();
// //设置连接rabbitmq主机
// connectionFactory.setHost("192.168.2.2");
// //设置端口号
// connectionFactory.setPort(5672);
// //设置连接那个虚拟主机
// connectionFactory.setVirtualHost("/ems");
// //设置访问虚拟主机的用户名和密码
// connectionFactory.setUsername("ems");
// connectionFactory.setPassword("123");
//
// //获取连接对象
// Connection connection = connectionFactory.newConnection();
//
//通过工具类获取连接对象
Connection connection = RabbitMQUtils.getConnection();
//获取连接中通道
Channel channel = connection.createChannel();
//通道绑定对应消息队列
//参数1: 队列名称 如果队列不存在自动创建
//参数2: 用来定义队列特性是否要持久化 true 持久化队列 false 不持久化
//参数3: exclusive 是否独占队列 true 独占队列 false 不独占
//参数4: autoDelete: 是否在消费完成后自动删除队列 true 自动删除 false 不自动删除
//参数5: 额外附加参数
channel.queueDeclare("hello",true,false,true,null);
channel.basicConsume("hello", true, new DefaultConsumer(channel){
//最后一个参数:消息队列取出的消息
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("new String(body): " + new String(body));
}
});
// channel.close();
// connection.close();
}
}
封装的公共类:
package com.eddie.utiils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtils {
private static ConnectionFactory connectionFactory;
//private static Properties properties;
static{
//重量级资源 类加载执行之执行一次
//创建连接mq的连接工厂对象
connectionFactory = new ConnectionFactory();
//设置连接rabbitmq主机
connectionFactory.setHost("192.168.2.2");
//设置端口号
connectionFactory.setPort(5672);
//设置连接那个虚拟主机
connectionFactory.setVirtualHost("/ems");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123");
}
//定义提供连接对象的方法
public static Connection getConnection() {
try {
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
//关闭通道和关闭连接工具方法
public static void closeConnectionAndChanel(Channel channel, Connection conn) {
try {
if(channel!=null) channel.close();
if(conn!=null) conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
第二种模型(work quene)
Work queues,也被称为(Task queues),任务模型。
当消息处理比较耗时的时候,可能生产消息的速度会
远远大于消息的消费速度。长此以往,消息就会堆积
越来越多,无法及时处理。此时就可以使用work 模型:
让多个消费者绑定到一个队列,共同消费队列中的消息。
队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
角色:
P:生产者:任务的发布者
C1:消费者-1,领取任务并且完成任务,假设完成速度较慢
C2:消费者-2:领取任务并完成任务,假设完成速度快
总结:默认情况下,RabbitMQ将按顺序将每个消息发送给 下一个使用者。平均而言,每个消费者都会收到相同数量 的消息。这种分发消息的方式称为循环。
假如生产者发送了10条消息,每个消费者将获取5条数据,其中一个消费者读取到第三条的时候发生宕机,则剩下的消息就会丢失,这样我们就需要关闭rabbitmq自动确认机制,手动去给一个确认标识,即可以解决宕机消息丢失问题,又可以产生能者多劳的效果
生产者:
package com.eddie.workqueues;
import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Provider2 {
//生产消息
@Test
public void testSendMessage() throws IOException, TimeoutException {
//通过工具类获取连接对象
Connection connection = RabbitMQUtils.getConnection();
//获取连接中通道
Channel channel = connection.createChannel();
//通道绑定对应消息队列
//参数1: 队列名称 如果队列不存在自动创建
//参数2: 用来定义队列特性是否要持久化 true 持久化队列 false 不持久化(服务重启后队列还保存,但不保存消息)
//参数3: exclusive 是否独占队列 true 独占队列 false 不独占
//参数4: autoDelete: 是否在消费完成后自动删除队列 true 自动删除 false 不自动删除
//参数5: 额外附加参数
channel.queueDeclare("hello",true,false,true,null);
//发布消息
//参数1: 交换机名称 参数2:队列名称 参数3:传递消息额外设置(如下参数保证服务重启后消息还存在,可继续消费) 参数4:消息的具体内容
for(int i = 0; i<20; i++){
channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,(i+"hello rabbitmq").getBytes());
}
//调用工具类
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
消费者1:
package com.eddie.workqueues;
import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws IOException {
//通过工具类获取连接对象
Connection connection = RabbitMQUtils.getConnection();
//获取连接中通道
Channel channel = connection.createChannel();
channel.basicQos(1); //每次只能消费一条消息
channel.queueDeclare("hello",true,false,true,null);
//参数1:队列名称 参数2:消息确认 true消费者自动向rabbitmq确认消费 false不会自动确认
channel.basicConsume("hello", false, new DefaultConsumer(channel){
//最后一个参数:消息队列取出的消息
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws
IOException {
System.out.println("111new String(body): " + new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
消费者2:
package com.eddie.workqueues;
import com.eddie.utiils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import static java.lang.Thread.sleep;
public class Consumer2 {
public static void main(String[] args) throws IOException {
//通过工具类获取连接对象
Connection connection = RabbitMQUtils.getConnection();
//获取连接中通道
Channel channel = connection.createChannel();
channel.basicQos(1); //每次只能消费一条消息
channel.queueDeclare("hello",true,false,true,null);
//参数1:队列名称 参数2:消息确认 true消费者自动向rabbitmq确认消费 false不会自动确认
channel.basicConsume("hello", false, new DefaultConsumer(channel){
//最后一个参数:消息队列取出的消息
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws
IOException {
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("222new String(body): " + new String(body));
//手动确认 参数1:手动确认消息标识 参数2:是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
设置通道一次只能消费一个消息 关闭消息的自动确认,开启手动确认消息
标签:直连,connectionFactory,队列,点对点,RabbitMQ,参数,import,com,channel 来源: https://blog.csdn.net/qq_41835151/article/details/120497215