其他分享
首页 > 其他分享> > activemq 事务和签收

activemq 事务和签收

作者:互联网

activemq事务:

Connection类中的createSession有两个参数,第一个参数是是否开启事务(true/false);第二个参数是消息确认机制;当第一个参数设置为true,即开启事务;当开启事务时,activemq不会主动提交事务,需要我们手动提交。即需要额外执行commit方法;

生产者事务代码示例:

 1 package com.mock.utils;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.DeliveryMode;
 5 import javax.jms.JMSException;
 6 import javax.jms.MessageProducer;
 7 import javax.jms.ObjectMessage;
 8 import javax.jms.Queue;
 9 import javax.jms.Session;
10 
11 import org.apache.activemq.ActiveMQConnectionFactory;
12 
13 public class TestActiveMqProducerTransactionCanDelete {
14     private static final String ACTIVEMQ_URL = "tcp://192.168.2.189:61616";
15     private static final String QUEUE_NAME = "queue_01";
16 
17     public static void main(String[] args) {
18         // 创建连接工厂,按照给定的URL,采用默认用户名密码
19         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
20         // 通过连接工厂 获取connection 并启动访问
21         Connection conn = null;
22         Session session = null;
23         try {
24             conn = activeMQConnectionFactory.createConnection();
25 
26             conn.start();
27             // 创建session会话 开启事务
28             session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
29             // 创建目的地 (具体是队列还是主题topic)
30             Queue queue = session.createQueue(QUEUE_NAME);
31             // 创建消息的生产者
32             MessageProducer messageProducer = session.createProducer(queue);
33             messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
34             // Byte类型的数据
35             ObjectMessage message = session.createObjectMessage();
36             User user = new User();
37             user.setAddress("嘉兴");
38             user.setName("Joy");
39             message.setObject(user);
40             message.setStringProperty("StringProperty", "我是 属性xxxxxxx");
41             messageProducer.send(message);
42 
43             messageProducer.close();
44 
45             // 模拟异常
46             // int a = 10 / 0;
47             session.commit();
48         } catch (Exception e) {
49             try {
50                 session.rollback();
51             } catch (JMSException e1) {
52                 e1.printStackTrace();
53             }
54             e.printStackTrace();
55         } finally {
56             try {
57                 if (session != null) {
58                     session.close();
59                 }
60                 if (conn != null) {
61                     conn.close();
62                 }
63             } catch (JMSException e) {
64                 e.printStackTrace();
65             }
66         }
67 
68         System.out.println("发送消息成功");
69     }
70 
71 }
View Code

消费者代码示例:

 1 package com.mock.utils;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.JMSException;
 5 import javax.jms.MessageConsumer;
 6 import javax.jms.ObjectMessage;
 7 import javax.jms.Queue;
 8 import javax.jms.Session;
 9 
10 import org.apache.activemq.ActiveMQConnectionFactory;
11 
12 public class TestActiveMqConsumerTxCanDelete {
13     private static final String ACTIVEMQ_URL = "tcp://192.168.2.189:61616";
14     private static final String QUEUE_NAME = "queue_01";
15 
16     @SuppressWarnings("unchecked")
17     public static void main(String[] args) {
18         // 创建连接工厂,按照给定的URL,采用默认用户名密码
19         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
20         activeMQConnectionFactory.setTrustAllPackages(true);
21         // 通过连接工厂 获取connection 并启动访问
22         Connection conn = null;
23         Session session = null;
24         try {
25             conn = activeMQConnectionFactory.createConnection();
26 
27             conn.start();
28             // 创建session会话
29             session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
30             // 创建目的地 (具体是队列还是主题topic)
31             Queue queue = session.createQueue(QUEUE_NAME);
32 
33             // 创建消息的生产者
34             MessageConsumer messageConsumer = session.createConsumer(queue);
35             /**
36              * 同步阻塞方式(receive()) 订阅者或者接受者调用MessageConsumer的receive()方法来接受消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞;
37              */
38             while (true) {
39                 ObjectMessage message = (ObjectMessage) messageConsumer.receive();
40                 System.out.println("*********************");
41                 if (message != null) {
42                     System.out.println(message.getStringProperty("StringProperty"));
43                     User user = (User) message.getObject();
44                     System.out.println(user);
45                 } else {
46                     break;
47                 }
48                 System.out.println("*********************");
49             }
50 
51             messageConsumer.close();
52             // 模拟异常
53             // int a = 10 / 0;
54             // 必须要加commit,这样才算正在被消费
55             session.commit();
56         } catch (JMSException e) {
57             try {
58                 session.rollback();
59             } catch (JMSException e1) {
60                 e1.printStackTrace();
61             }
62             e.printStackTrace();
63         } finally {
64             try {
65                 if (session != null) {
66                     session.close();
67                 }
68                 if (conn != null) {
69                     conn.close();
70                 }
71             } catch (JMSException e) {
72                 e.printStackTrace();
73             }
74 
75         }
76     }
77 
78 }
View Code

 

签收:

签收一般应用在消费者,对于生产者,无论是自动签收还是手动签收,消息都能进入到队列中;

在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送。事务的优先级大于签收,当事务模式,如果事务没有提交,即使客户端签收也不会消费消息。非事务性会话中,消息何时被确认取决于创建会话时的应答模式(ACKNOWLEDGE),所以这里所讲的签收,前提是非事务模式;即Connection类中的createSession的两个参数,第一个参数是是否开启事务是false;第二个参数是设置签收模式;

签收模式:

1.Session.AUTO_ACKNOWLEDGE

当消息从MessageConsumer的receive方法返回或者从MessageListener接口的onMessage方法返回时,会话自动确认消息签收

2.Session.CLIENT_ACKNOWLEDGE

需要消费者客户端主动调用acknowledge方法签收消息,这种模式实在Session层面进行签收的,签收一个已经消费的消息会自动的签收这个Session已消费的所有消息:

例如一个消费者在一个Session中消费了5条消息,然后确认第3条消息,所有这5条消息都会被签收

3.Session.DUPS_OK_ACKNOWLEDGE(一般不建议使用)

这种方式允许JMS不必急于确认收到的消息,允许在收到多个消息之后一次完成确认,与Auto_AcKnowledge相比,这种确认方式在某些情况下可能更有效,因为没有确认,当系统崩溃或者网络出现故障的时候,消息可以被重新传递.

这种方式会引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息才可使用。(如果ActiveMQ再次传送同一消息,那么消息头中的JMSRedelivered将被设置为true)

4.Session.DUPS_OK_ACKNOWLEDGE(很少使用)

事务下的签收(Session.SESSION_TRANSACTED):开始事务的情况下,可以使用该方式。该种方式很少使用到

消费者自动签收Session.AUTO_ACKNOWLEDGE:

activemq 之hello world 显示的消费者签收都是自动签收,这里不再赘述;

消费者手动签收Session.CLIENT_ACKNOWLEDGE:

设置手动签收,需要设置签收(message.acknowledge());如果不设置签收,则后台管理会显示 消费未消费;如果又多个消费者,都是手动签收,其中又一个消费者没有设置签收(即没有message.acknowledge()),但是还是会显示所有消息已签收;

 

消费者签收代码示例:

 1 package com.mock.utils;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.JMSException;
 5 import javax.jms.MessageConsumer;
 6 import javax.jms.ObjectMessage;
 7 import javax.jms.Queue;
 8 import javax.jms.Session;
 9 
10 import org.apache.activemq.ActiveMQConnectionFactory;
11 
12 public class TestActiveMqConsumerTxCanDelete {
13     private static final String ACTIVEMQ_URL = "tcp://192.168.2.189:61616";
14     private static final String QUEUE_NAME = "queue_01";
15 
16     public static void main(String[] args) throws JMSException {
17         // 创建连接工厂,按照给定的URL,采用默认用户名密码
18         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
19         activeMQConnectionFactory.setTrustAllPackages(true);
20         // 通过连接工厂 获取connection 并启动访问
21         Connection conn = activeMQConnectionFactory.createConnection();
22 
23         conn.start();
24         // 创建session会话
25         Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
26         // 创建目的地 (具体是队列还是主题topic)
27         Queue queue = session.createQueue(QUEUE_NAME);
28 
29         // 创建消息的生产者
30         MessageConsumer messageConsumer = session.createConsumer(queue);
31         /**
32          * 同步阻塞方式(receive()) 订阅者或者接受者调用MessageConsumer的receive()方法来接受消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞;
33          */
34         while (true) {
35             ObjectMessage message = (ObjectMessage) messageConsumer.receive();
36             System.out.println("*********************");
37             if (message != null) {
38                 System.out.println(message.getStringProperty("StringProperty"));
39                 User user = (User) message.getObject();
40                 System.out.println(user);
41                 message.acknowledge();// 设置签收
42             } else {
43                 break;
44             }
45             System.out.println("*********************");
46         }
47 
48         messageConsumer.close();
49         session.close();
50         conn.close();
51     }
52 
53 }

 

标签:事务,Session,jms,签收,session,import,activemq,javax
来源: https://www.cnblogs.com/lixiuming521125/p/16566022.html