RabbitMQ学习
作者:互联网
RabbitMQ学习
使用场景
-
消息队列解决什么问题?
- 异步处理
- 应用解耦
- 流量削锋
- 日志处理
安装与配置
用户及vhost配置
添加用户
virtual host管理
开发指南
Simple简单队列
模型
P:消息生产者
红色:阶列
C:消息消费者
不足
耦合性高,生产者—消费者一一对应。队列名变更都得变理
工作队列
模型
为什么会出现工作队列
Simple队列是一一对应的,而且我们实际开发,生产者发送消息是不费力的,而消费者一般是跟业务相结合的。,消息者接收到消息之后就需要处理。需要花费时间。队列就需要更多的消费者。
现象:
消费者1和2处理的消息是一样的
消费者1:奇数
消费者2:偶数
其实是一个轮询分发(roun-robin)
公平分发(Fail dispatch)
使用公平发分要关闭autoACK,改成手动。
公共的消费类
public class BHFailCustomConsumer extends DefaultConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(BHFailCustomConsumer.class);
private String consumerName = "defaultConsumer";
private long delayTime = 100L;
public BHFailCustomConsumer(Channel channel,long delayTime, String ...name) {
super(channel);
try {
//接收消息,在没有应答前只接收1条消息
channel.basicQos(1);
} catch (IOException e) {
e.printStackTrace();
}
this.delayTime = delayTime;
if(name!=null) {
this.consumerName = name[0];
}else{
this.consumerName = "defaultConsumer "+ UUID.randomUUID().toString();
}
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
LOGGER.debug(this.consumerName +"_"+msg);
try {
Thread.sleep(delayTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
//手动应答
getChannel().basicAck(envelope.getDeliveryTag(),false);
}
}
C1,C2,使用不是的delayTime延迟
Connection connection = messageBrokerHelper.getConnection();
Channel channel = messageBrokerHelper.getChannel(connection,QUEUE_NAME);
Consumer consumer = new BHFailCustomConsumer(channel,delayTime,name);
try {
LOGGER.info("Consumer {} waiting consume,delay time {}",name,delayTime);
//关掉自动应答,由消费者手动处理应答
channel.basicConsume(QUEUE_NAME,false,consumer);
} catch (IOException e) {
LOGGER.debug("consumer listener failure",e);
}
W 生产者
public void sender(int senderCount){
Connection connection = messageBrokerHelper.getConnection();
Channel channel = messageBrokerHelper.getChannel(connection,QUEUE_NAME);
try {
int senderTime = senderCount < 1 ? 1 : senderCount;
for (int i=0;i<senderTime;i++) {
String body = "hello,rabbit mq" + i;
channel.basicPublish("", QUEUE_NAME, null, body.getBytes());
}
} catch (IOException e) {
e.printStackTrace();
}finally {
messageBrokerHelper.closeChannel(channel);
messageBrokerHelper.closeConnection(connection);
}
}
消息应答与持久化
订阅模式
模型
- 一个生产者,多个消费者,每个消费者有自己报价列
- 生产者没有直接把消息发送到队列,而是发到了交换机,转发器exchange
- 每个队列都要绑定到交换机上
- 生产者发送的消息,经过交换机,到达队列。实现一个消息被多个消费者所消费。
场景:
注册->邮件->短信
生产者
消费者
exchange(交换机 转发器)
接收生产者消息,并接收到的消转发给队列
fanout:不处理路由键
Direct:处理路由键
路由模式
Topic模式
‘# 匹配一个或者多个
*匹配一个
RPC模式
消息确认机制(事务+confirm)
两种方式:
AMQP实现了事务机制
Confirm模式
事务机制
-
txselect
用户将当前channel设置成transation模式
-
txCommit
用于提交事务
- txRollback
回滚事务
Confirm模式
生产者的实现原理
开启confirm模式
标签:消费者,队列,RabbitMQ,学习,生产者,delayTime,消息,channel 来源: https://blog.csdn.net/mysmnyc/article/details/120424397