2021-11-10
作者:互联网
ActiveMQ学习笔记一
文章目录
一、Linux安装ActiveMQ及应用
https://activemq.apache.org/官网下载apache-activemq-5.15.9-bin.tar.gz 用xftp上传到/opt目录解压在bin目录中启动./activemq start ,前提是需要对应jdk版本(我安装是对应需要jdk1.8)
启动命令:
启动 bin下面的命令
./activemq start
默认后台端口61616 前台端口8161
查看是否启动三种方式
1 netstat -anp |grep 61616
2 ps -ef | grep activemq | grep -v grep
3 lsof -i:61616
启动完成后访问路径:
(自己服务器的IP地址)xxx.xxx.xxx.xxx:8161/activemq
默认用户名密码 admin admin
它有两大模式:queue 和 topic
两大模式的比较
二、JMS
组成
JMS 组成结构和特点
provide 服务器
produce 生产者
consumer 消费者
message :消息头,消息体,消息属性
JMS message
消息头:几个重要的设置
JMSDestination 目的地:发送消息的目的地主要指Queue和Topic
JMSDeliveryModel 持久和非持久
JMSExpiration 消息过期设置
JMSPriority 消息优先级
JMSMessageID 消息唯一标志id:唯一识别每个消息的标志由mq产生
消息体 :封装具体的消息数据
5种消息体格式:
TextMessage 普通的字符串包含一个String
MapMessage 一个Map类型消息,key是String 而值是java基本类型
BytesMessage 二进制数组消息,包含一个byte[]
StreamMessage java数据流消息,用标准的流操作来顺序填充和读取
ObjectMess age 对象消息,包含一个可序列化的java对象
发送和接收消息体类型必须一致对应
消息属性:
如果需要除消息头字段以外的值,那么可以使用消息属性
识别/去重/重点标注等操作非常有用的方法
是一对kv对:如 textMessage.setStringProperty(“zs”,“vip”);
消息可靠性 : (持久性 ,事务, 签收)
1 persisent 持久性:messageProducer.setDeliverymodel(DeliveryModel.NON_PERSISENT);//非持久化
DeliveryModel.PERSISENT持久化 默认策略就是持久化
注意:
持久化topic:一定要先运行一次消费者,也就是需要先订阅这个主题,然后再运行生产者发送消息
此时无论消费者是否在线都会接收到消息,不在线的话下次连接时候会把没有接收过的消息都接收回来。
队列:
1 非持久性:当服务器宕机 ,消息丢失
2 持久性:当服务器宕机,消息依然存在
3 队列默认策略就是持久化
//非持久化演示
生产者
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://xxx.xxx.xxx.xxx:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1 创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2 通过连接工厂,获得connection 并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 创建会话session
// 两个参数,第一个叫事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目的地
Queue queue = session.createQueue(QUEUE_NAME);
//创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
//设置消息为非持久化
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISENT);
//messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); 持久化
//通过使用messageProducer生产3条消息发送到mq队列
for (int i = 1; i <=3; i++) {
//创建消息
TextMessage textMessage = session.createTextMessage("message" + i);
//通过messageProducer生产者发送消息给mq
messageProducer.send(textMessage);
}
//关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("***********MQ消息发送完成");
}
}
消费者
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.24.132:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
System.out.println("******我是2号消费者");
//1 创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2 通过连接工厂,获得connection 并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 创建会话session
// 两个参数,第一个叫事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目的地
Queue queue = session.createQueue(QUEUE_NAME);
//创建消息的消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
//通过监听的方式来接收消息
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (null != message && message instanceof TextMessage) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("**********MessageListener收到消息"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
–事务偏生产者,签收偏消费者
2 事务
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
如果是false:不开启事务,只要执行send就进入到队列中去,那么第二个签收参数设置需要有效
如果是true,开启事务,先执行send再执行commit,消息才被真正提交到队列中,消息需要批量发送需要缓冲区处理
生产者--------如果是false 则send的时候就提交成功到目的地,如果是true,一定需要提交commit,出错可以回滚rollback,如果没有commit那么消息就没有发送到目的地。
消费者 ---------如果设置为false,则消息只被消费一次,如果是true,一定需要设置commit,如果没有写commit那么消息会被重复消费,
3、签收
非事务下的签收:
1 自动签收(默认)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
2 手动签署 :需要显示的调用acknowledge方法
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
接受完消息后 调用textMessage.acknowledge();
如果没有收到消息后没有签收会出现重复消费问题
3 可重复签收:了解即可
事务下的签收:
生产事务开始,只有commit后才能将全部消费变为已消费
当一个事务被成功提交则消息被自动签收,如果事务回滚,则消息会被再次传送,
事务与签收的关系:事务比签收更大,只要是开始事务,不管有没有显示调用acknowledge方法,只要提交就默认是自动签收
如果开启事务但是没有commit调用,即使写了acknowledge方法签收也不会成功。
非事务会话中,消息何时被确认取决于创建会话时候的应答模式(acknowledgement model),如果自动签收就自动,如果手动签收就需要调用textMessage.acknowledge();
三、总结
队列
topic
jms
持久非持久
队列持久订阅和非持久订阅
标签:11,10,事务,Session,签收,connection,session,2021,消息 来源: https://blog.csdn.net/weixin_48214611/article/details/121249054