Rabbitmq同时处理多个消息(多线程)
作者:互联网
Rabbitmq同时处理多个消息(多线程)
文章目录
basicQos预取方法参数解析
- basicQos(int prefetchCount)
- basicQos(int prefetchCount, boolean global)
- basicQos(int prefetchSize, int prefetchCount, boolean global)
参数:
- prefetchSize:可接收消息的大小
- prefetchCount:处理消息最大的数量。
- global:是不是针对整个Connection的,因为一个Connection可以有多个Channel,如果是false则说明只是针对于这个Channel的
实际中prefetchSize和global几乎不使用,rabbitmq也没有去实现其操作,不考虑。
basicConsumer消费方法参数解析
- basicConsumer(String queue, Consumer consumer)
- basicConsumer(String queue, boolean autoAck, Consumer consumer)
参数:
- queue:监听的队列名称
- autoAck:是否自动消费消息
- consumer:使用的消费者类
一, 失败不重试,直接确认
Consumer.java 消费者类
package com.lmc.mq.nospring;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
/**
* @author lmc
* @Description: TODO
* @Create 2021-09-07 22:06
* @version: 1.0
*/
public class Consumer {
private final static String QUEUE_NAME = "lmc-test"; //队列名称
public static void main(String[] args) {
initModule();
}
public static void initModule() {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("xx.xx.xx.xx"); //设置rabbitmq-server的地址
connectionFactory.setPort(5672); //使用的端口号
connectionFactory.setVirtualHost("/"); //使用的虚拟主机
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//由连接工厂创建连接
Connection connection = null;
try {
connection = connectionFactory.newConnection();
//通过连接创建信道
final Channel channel = connection.createChannel();
channel.basicQos(0, 3, true);
//创建消费者,指定要使用的channel。QueueingConsume类已经弃用,使用DefaultConsumer代替
DefaultConsumer consumer = new DefaultConsumer(channel) {
//监听的queue中有消息进来时,会自动调用此方法来处理消息。但此方法默认是空的,需要重写
@Override
public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
MqMessageDispatcher.doDispatch(new String(body, "UTF-8"), channel, envelope);
}
};
//监听指定的queue。会一直监听。
//参数:要监听的queue、是否自动确认消息、使用的Consumer
channel.basicConsume(QUEUE_NAME, false, consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
MqMessageDispatcher.java 多线程类:同时并发处理多个消息
package com.lmc.mq.nospring;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author lmc
* @Description: TODO
* @Create 2021-09-07 22:45
* @version: 1.0
*/
public class MqMessageDispatcher {
public static Logger logger = LoggerFactory.getLogger(MqMessageDispatcher.class);
public static ExecutorService msgHandleService = Executors.newFixedThreadPool(5);
static {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
msgHandleService.shutdown();
}
});
}
public static void doDispatch(String message, Channel channel, Envelope envelope) {
msgHandleService.execute(new MessageHandleTask(message, channel, envelope));
}
private static class MessageHandleTask implements Runnable {
String message;
Channel channel;
Envelope envelope;
public MessageHandleTask(String message, Channel channel, Envelope envelope) {
this.message = message;
this.channel = channel;
this.envelope = envelope;
}
@Override
public void run() {
long start = System.currentTimeMillis();
logger.info("Received message: " + message);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
// 手动确认消息,若自动确认则不需要写以下该行
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (IOException e) {
System.err.println("fail to confirm message:" + message);
}
}
}
}
二, 失败重试5次,再直接确认
MqMessageDispatcher.java
package com.lmc.mq.nospring;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author lmc
* @Description: TODO
* @Create 2021-09-07 22:45
* @version: 1.0
*/
public class MqMessageDispatcher {
public static final Logger logger = LoggerFactory.getLogger(MqMessageDispatcher.class);
public static ExecutorService msgHandleService = Executors.newFixedThreadPool(5);
public static Map<String, Integer> cacheMap = new HashMap(5);
static {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
msgHandleService.shutdown();
}
});
}
public static void doDispatch(String message, Channel channel, Envelope envelope) {
msgHandleService.execute(new MessageHandleTask(message, channel, envelope));
}
private static class MessageHandleTask implements Runnable {
String message;
Channel channel;
Envelope envelope;
public MessageHandleTask(String message, Channel channel, Envelope envelope) {
this.message = message;
this.channel = channel;
this.envelope = envelope;
}
@Override
public void run() {
int currentTimes = 0; // 当前重试次数
boolean isSuccess = false; // 消息是否处理成功
// 获取当前消息重试次数,(这种情况适合每条消息内容不一样,最好每条消息都有唯一标识)
if (cacheMap.containsKey(message)) {
currentTimes = cacheMap.get(message);
}else {
cacheMap.put(message, 0);
}
long start = System.currentTimeMillis();
logger.info("Received message: " + message);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
if (isSuccess) {
// 手动确认消息
logger.info("message[" + message + "] consumer success.(Ack)");
cacheMap.put(message, 0);
channel.basicAck(envelope.getDeliveryTag(), false);
}else {
if (currentTimes >= 5) {
// 手动确认消息,若自动确认则不需要写以下该行
logger.warn("message[" + message + "] consumer fail,have retry 5 times.(Ack)");
cacheMap.put(message, 0);
channel.basicAck(envelope.getDeliveryTag(), false);
}else {
// 处理失败,重试未5次,重新处理
cacheMap.put(message, ++currentTimes);
logger.warn("message[" + message + "] consumer fail,prepare to retry " + currentTimes + " times...(Nack)");
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
} catch (IOException e) {
System.err.println("fail to confirm message:" + message);
}
}
}
}
标签:java,多个,envelope,Rabbitmq,import,message,多线程,public,channel 来源: https://blog.csdn.net/lmchhh/article/details/120378340