activemq 高级使用
作者:互联网
package top.activemq.util.performance;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.AsyncCallback;
import java.util.UUID;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* description:异步投递
*
* //1、在brokerUri后跟jms.useAsyncSend参数
* cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
* //2、设置connectionFactory的useAsyncSend属性,此时要注意ConnectionFactory的类型必须是ActiveMQConnectionFactory
* ((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
* //3、设置Connection的useAsyncSend属性,此时要注意Connection的类型必须是ActiveMQConnection
* ((ActiveMQConnection)connection).setUseAsyncSend(true);
*
* @author: he QQ: 905845006
* @email: 905845006@qq.com
* @date: 2020/3/21 6:32 PM
*/
public class MQProducer {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "queue-async";
public static void main(String args[]) throws JMSException {
//注意此处ConnectionFactory的类型必须是ActiveMQConnectionFactory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
factory.setUseAsyncSend(true);//设置为异步投递,默认即为true
Connection connection = factory.createConnection();
connection.start();
//创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目的地
Queue queue = session.createQueue(QUEUE_NAME);
//创建消息生产者
ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);
try {
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("msg" + i);
textMessage.setJMSMessageID(UUID.randomUUID().toString()+"--orderCode");
//发送消息时传入回调
producer.send(textMessage, new AsyncCallback() {
@Override
public void onSuccess() {
try {
System.out.println(textMessage.getJMSMessageID() + " 发送成功!");
} catch (JMSException e) {
e.printStackTrace();
}
}
@Override
public void onException(JMSException e) {
try {
System.out.println(textMessage.getJMSMessageID() + " 发送失败!");
} catch (JMSException je) {
je.printStackTrace();
}
}
});
}
//提交事务
//session.commit();
} catch (Exception e) {
//回滚事务
//session.rollback();
} finally {
producer.close();
session.close();
connection.close();
}
}
}
package top.activemq.util.performance;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ScheduledMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* description:延迟投递
*
* @author: he QQ: 905845006
* @email: 905845006@qq.com
* @date: 2020/3/21 6:34 PM
*/
public class MQProducer1 {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "queue-delay";
public static void main(String args[]) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = factory.createConnection();
connection.start();
//创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目的地
Queue queue = session.createQueue(QUEUE_NAME);
//创建消息生产者
ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);
try {
TextMessage textMessage = session.createTextMessage("delayMsg");
//设置和延迟投递相关的消息属性
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5000);//延迟5秒投递
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 2000);//每隔2秒投递1次这个消息
textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 3);//重复投递3次,加上正常投递的1次,共4次
//发送消息
producer.send(textMessage);
//提交事务
//session.commit();
} catch (Exception e) {
//回滚事务
//session.rollback();
} finally {
producer.close();
session.close();
connection.close();
}
}
}
package top.activemq.util.performance;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ScheduledMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* description:定时投递,使用CRON
*
* @author: he QQ: 905845006
* @email: 905845006@qq.com
* @date: 2020/3/21 6:35 PM
*/
public class MQProducer2 {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "queue-delay";
public static void main(String args[]) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = factory.createConnection();
connection.start();
//创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目的地
Queue queue = session.createQueue(QUEUE_NAME);
//创建消息生产者
ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);
try {
TextMessage textMessage = session.createTextMessage("delayMsgcron");
//textMessage.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, UUID.randomUUID().toString());
//设置和定时投递的CRON
textMessage.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON,"* * * * *");
//发送消息
producer.send(textMessage);
//提交事务
//session.commit();
} catch (Exception e) {
//回滚事务
//session.rollback();
} finally {
producer.close();
session.close();
connection.close();
}
}
}
package top.activemq.util.queue;
/**
* description:死信
*
* @author: he QQ: 905845006
* @email: 905845006@qq.com
* @date: 2020/3/21 7:51 PM
*/
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ScheduledMessage;
import javax.jms.*;
public class JmsProducer_Redelivery {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String QUEUE_NAME = "Redelivery";
public static void main(String[] args) throws JMSException {
// 创建连接工厂,按照给定的url地址采用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 通过连接工厂,获取Connection并启动
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 创建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地(目的地有两个子接口,分别是Queue和Topic)
Queue queue = session.createQueue(QUEUE_NAME);
// 创建消息生产者,生产的消息放到queue中
MessageProducer messageProducer = session.createProducer(queue);
// 使用messageProducer生产消息发送到队列中
for (int i = 0; i < 3; i++) {
// 创建一条消息,可以理解成字符串
TextMessage textMessage = session.createTextMessage("message-" + i);
messageProducer.send(textMessage);
}
// 按照资源打开的相反顺序关闭资源
messageProducer.close();
session.close();
connection.close();
}
}
package top.activemq.util.queue;
/**
* description: 死信
*
* @author: he QQ: 905845006
* @email: 905845006@qq.com
* @date: 2020/3/21 7:51 PM
*/
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import javax.jms.*;
public class JmsConsumer_Redelivery {
public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
public static final String QUEUE_NAME = "Redelivery";
public static void main(String[] args) throws JMSException {
// 创建连接工厂,按照给定的url地址采用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 通过连接工厂,获取Connection并启动
Connection connection = activeMQConnectionFactory.createConnection();
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(3);// 修改重发次数为3次
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);// 将重发策略设置到ConnectionFactory中
connection.start();
// 创建Session
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 创建目的地(目的地有两个子接口,分别是Queue和Topic)
Queue queue = session.createQueue(QUEUE_NAME);
// 创建消费者,指明从queue取消息
MessageConsumer messageConsumer = session.createConsumer(queue);
// 手动修改RedeliveryPolicy(重发策略)
while (true) {
// 因为向队列中存放的是TextMessage的实例,所以取出来的时候,也要用TextMessage的实例来接收
// 这里的receive()方法表示一直等待,如果给它传一个long类型的毫秒数,表示consumer等待超时时间
TextMessage textMessage = (TextMessage) messageConsumer.receive(1000L);
if (textMessage != null) {
System.out.println("消费者消费:" + textMessage.getText());
} else {
break;
}
}
// 上面开启了事务,正常情况下,必须执行commit();这里模拟出问题的情况,注释掉commit();
// session.commit();
// 按照资源打开的相反顺序关闭资源
messageConsumer.close();
session.close();
connection.close();
}
}
来源:锌闻网
标签:textMessage,ActiveMQConnectionFactory,高级,connection,session,使用,import,activemq 来源: https://www.cnblogs.com/vwvwvwgwg/p/12542522.html