其他分享
首页 > 其他分享> > spring 整合 actionMQ(一)

spring 整合 actionMQ(一)

作者:互联网

前段时间配置了rabbitMQ 主要是为了公司发送大批量邮件而服务,结果锦鲤说要用actionMQ,还能说什么,肯定是actionMq的走起了。

鉴于第一次配置、使用ActionMQ,所以进行详细的配置,记录。如果有最新的用法,或者跟详细、更深入的使用,我会不定期的更新,

期待和小伙伴们共同进步。

添加MAVEN依赖

<!--添加activemq的依赖-->
<!--ActiveMQ-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<version>5.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
<version>5.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${org.springframework.version}</version>
</dependency>

  这里有的小伙伴可能依赖了

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${org.springframework.version}</version>
</dependency>
 

毕竟一个顶的上上面的一大堆了,可出问题的时候也是来的猝不及防的。slf4j 冲突。大部分的程序中都会有slf4j,如果你报错了,就乖乖用上面的一大堆把。或者可以考虑把冲突的包从Maven程序中排除。具体怎么排除,可以百度了。

 

配置文件  (对于我个人来说,第一步,就是搞定配置文件,类什么的在配置的过程中再去书写)

 1 <?xml version="1.0" encoding="UTF-8" ?>
 2 
 3 <beans xmlns="http://www.springframework.org/schema/beans"
 4        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 5        xmlns:context="http://www.springframework.org/schema/context"
 6        xsi:schemaLocation="http://www.springframework.org/schema/beans
 7        http://www.springframework.org/schema/beans/spring-beans.xsd
 8        http://www.springframework.org/schema/context
 9        http://www.springframework.org/schema/context/spring-context.xsd ">
10 
11     <context:annotation-config />
12 
13     <!--Activemq的连接工厂-->
14     <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
15         <property name="brokerURL" value="tcp://127.0.0.1:61616" />
16         <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />  <!-- 引用重发机制 -->
17     </bean>
18     <!--spring jms为我们提供的连接池 获取一个连接工厂-->
19     <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
20         <property name="targetConnectionFactory" ref="targetConnectionFactory" />
21     </bean>
22 
23     <!-- 消息目的地  点对点的模式-->
24     <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
25         <constructor-arg value="SpringActiveMQMsg"/>
26     </bean>
27     <!-- 消息目的地  点对点的模式-->
28     <bean id="queueDestinationTwo" class="org.apache.activemq.command.ActiveMQQueue">
29         <constructor-arg value="EmailMQMsg"/>
30     </bean>
31     <!-- jms模板  用于进行消息发送-->
32     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
33         <property name="connectionFactory" ref="connectionFactory"/>
34     </bean>
35     <!--注入我们的生产者-->
36     <bean class="mq.ProduceServiceImpl"/>
37 
38 
39     <!-- 配置消息监听器-->
40     <bean id="SimpleMsgListener" class="mq.SimpleMsgListener"/>
41     <!--配置消息容器-->
42     <bean id ="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
43         <!--配置连接工厂-->
44         <property name="connectionFactory" ref="connectionFactory"/>
45         <!--配置监听的队列-->
46         <property name="destination" ref="queueDestination"/>
47         <!--配置消息监听器-->
48         <property name="messageListener" ref="SimpleMsgListener"/>
49         <!--应答模式-->
50         <property name="sessionAcknowledgeMode" value="4"></property>
51     </bean>
52 
53 
54     <!-- 定义ReDelivery(重发机制)机制 ,重发时间间隔是100毫秒,最大重发次数是3次 http://www.kuqin.com/shuoit/20140419/339344.html -->
55     <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
56         <!--是否在每次尝试重新发送失败后,增长这个等待时间 -->
57         <property name="useExponentialBackOff" value="true"></property>
58         <!--重发次数,默认为6次   这里设置为1次 -->
59         <property name="maximumRedeliveries" value="2"></property>
60         <!--重发时间间隔,默认为1秒 -->
61         <property name="initialRedeliveryDelay" value="1000"></property>
62         <!--第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value -->
63         <property name="backOffMultiplier" value="2"></property>
64         <!--最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第
65             二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 -->
66         <property name="maximumRedeliveryDelay" value="1000"></property>
67     </bean>
68 </beans>

  配置的第一步肯定是链接到MQ服务了。(ActionMQ 链接工厂可以添加的属性还有很多,例如 username、password ...)

  第二步:配置spring jms为我们提供的连接池

  第三步:配置队列(我的是队列模式,也就是 1对1消费模式(一条消息消费一次),还有一个发布订阅的模式(一条消息可以被订阅的多个客户端消费))

  第四步:配置模板

  第五步:声明注入消息发布者(这个没必要,正常可用的类就可以,在其中引入 @Autowired private JmsTemplate jmsTemplate; 模板类,就可以发送消息)

发送消息的类

 1 package mq;
 2 import org.springframework.beans.factory.annotation.Autowired;
 3 import org.springframework.jms.core.JmsTemplate;
 4 import org.springframework.jms.core.MessageCreator;
 5 import javax.annotation.Resource;
 6 import javax.jms.*;
 7 
 8 /**
 9  * @ Author     :tian
10  * @ Date       :Created in 下午 2:21 2019/6/13 0015
11  * @ Description:生产者的实现类
12  */
13 public class ProduceServiceImpl{
14     @Autowired
15     private JmsTemplate jmsTemplate;
16     @Resource(name = "queueDestination")
17     private Destination destination;
18 
19     /**
20      * 发送消息
21      * @param msg
22      */
23     public void sendMessage(final String msg) {
24         jmsTemplate.convertAndSend("SpringActiveMQMsg",msg);
25 //        jmsTemplate.send(destination , new MessageCreator() {
26 //            @Override
27 //            public Message createMessage(Session session) throws JMSException {
28 //                TextMessage textMessage = session.createTextMessage(msg);
29 //                return textMessage;
30 //            }
31 //        });
32         System.out.println("现在发送的消息为: " + msg);
33     }
34 
35     /**
36      * 接收消息
37      */
38     public TextMessage receive() {
39         TextMessage tm = (TextMessage) jmsTemplate.receive("SpringActiveMQMsg");
40         try {
41             System.out.println("从队列SpringActiveMQMsg收到了消息:\t"
42                     + tm.getText());
43         } catch (JMSException e) {
44             e.printStackTrace();
45         }
46 
47         return tm;
48 
49     }
50 }

发送消息可以用以上两种方式(不只这两种,但这两个最简单。)

方式1:

jmsTemplate.convertAndSend("SpringActiveMQMsg",msg) // 参数一:你声明的队列的名称 (队列可以不声明,直接到ActionMQ 的控制台创建也可以,简化配置,只不过要换服务器的时候还得创建一次。) 参数二:发送的消息

方式2:

上面程序中注释的部分

 

        jmsTemplate.send(destination , new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(msg);
                return textMessage;
            }
        });
它是可以正确发送消息的 这里可以看到 发送方法也有两个参数 (参数一:声明队列的beanID 参数二:发送的消息创建者)
在这里可以看到有一个 TextMessage 类, 这个是消息的一种类型,actionMQ中消息有多种类型 JMS规范中的消息类型包括TextMessage、MapMessage、ObjectMessage、BytesMessage、和StreamMessage等五种。ActiveMQ也有对应的实现
详细的情况参见博文:https://www.cnblogs.com/dennisit/p/4551795.html

 

顺便介绍一下 消息的手动接收 TextMessage tm = (TextMessage) jmsTemplate.receive("SpringActiveMQMsg"); 使用jmsTemplate 模板 的receive("队列名称") ,可以一次消费一条消息。(肯定不止这一个方法了,具体的小伙伴们可以自行摸索)

消息的消费类型对应发送类型。如果想要消息自动接收消费,那就得配置消息监听器了。

标签:activemq,spring,jmsTemplate,发送,消息,整合,org,TextMessage,actionMQ
来源: https://www.cnblogs.com/hxz-nl/p/11015219.html