其他分享
首页 > 其他分享> > activemq 高级使用

activemq 高级使用

作者:互联网

package top.activemq.util.performance;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.AsyncCallback;

import java.util.UUID;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * description:异步投递
 *
 * //1、在brokerUri后跟jms.useAsyncSend参数
 * cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
 * //2、设置connectionFactory的useAsyncSend属性,此时要注意ConnectionFactory的类型必须是ActiveMQConnectionFactory
 *         ((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
 * //3、设置Connection的useAsyncSend属性,此时要注意Connection的类型必须是ActiveMQConnection
 *         ((ActiveMQConnection)connection).setUseAsyncSend(true);
 *
 * @author: he QQ:       905845006
 * @email: 905845006@qq.com
 * @date: 2020/3/21    6:32 PM
 */

public class MQProducer {

    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final String QUEUE_NAME = "queue-async";

    public static void main(String args[]) throws JMSException {

        //注意此处ConnectionFactory的类型必须是ActiveMQConnectionFactory
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
        factory.setUseAsyncSend(true);//设置为异步投递,默认即为true
        Connection connection = factory.createConnection();
        connection.start();

        //创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目的地
        Queue queue = session.createQueue(QUEUE_NAME);

        //创建消息生产者
        ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);
        try {
            for (int i = 0; i < 3; i++) {
                TextMessage textMessage = session.createTextMessage("msg" + i);
                textMessage.setJMSMessageID(UUID.randomUUID().toString()+"--orderCode");
                //发送消息时传入回调
                producer.send(textMessage, new AsyncCallback() {
                    @Override
                    public void onSuccess() {
                        try {
                            System.out.println(textMessage.getJMSMessageID() + " 发送成功!");
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onException(JMSException e) {
                        try {
                            System.out.println(textMessage.getJMSMessageID() + " 发送失败!");
                        } catch (JMSException je) {
                            je.printStackTrace();
                        }
                    }
                });
            }

            //提交事务
            //session.commit();
        } catch (Exception e) {
            //回滚事务
            //session.rollback();
        } finally {
            producer.close();
            session.close();
            connection.close();
        }
    }
}


package top.activemq.util.performance;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ScheduledMessage;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * description:延迟投递
 *
 * @author: he QQ:       905845006
 * @email: 905845006@qq.com
 * @date: 2020/3/21    6:34 PM
 */
public class MQProducer1 {

    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final String QUEUE_NAME = "queue-delay";

    public static void main(String args[]) throws JMSException {

        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
        Connection connection = factory.createConnection();
        connection.start();

        //创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目的地
        Queue queue = session.createQueue(QUEUE_NAME);

        //创建消息生产者
        ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);
        try {
            TextMessage textMessage = session.createTextMessage("delayMsg");
            //设置和延迟投递相关的消息属性
            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5000);//延迟5秒投递
            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 2000);//每隔2秒投递1次这个消息
            textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 3);//重复投递3次,加上正常投递的1次,共4次
            //发送消息
            producer.send(textMessage);

            //提交事务
            //session.commit();
        } catch (Exception e) {
            //回滚事务
            //session.rollback();
        } finally {
            producer.close();
            session.close();
            connection.close();
        }
    }
}


package top.activemq.util.performance;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ScheduledMessage;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * description:定时投递,使用CRON
 *
 * @author: he QQ:       905845006
 * @email: 905845006@qq.com
 * @date: 2020/3/21    6:35 PM
 */
public class MQProducer2 {

    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final String QUEUE_NAME = "queue-delay";

    public static void main(String args[]) throws JMSException {

        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
        Connection connection = factory.createConnection();
        connection.start();

        //创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目的地
        Queue queue = session.createQueue(QUEUE_NAME);

        //创建消息生产者
        ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);
        try {
            TextMessage textMessage = session.createTextMessage("delayMsgcron");
            //textMessage.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, UUID.randomUUID().toString());
            //设置和定时投递的CRON
            textMessage.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON,"* * * * *");
            //发送消息
            producer.send(textMessage);

            //提交事务
            //session.commit();
        } catch (Exception e) {
            //回滚事务
            //session.rollback();
        } finally {
            producer.close();
            session.close();
            connection.close();
        }
    }
}


package top.activemq.util.queue;

/**
 * description:死信
 *
 * @author: he QQ:       905845006
 * @email: 905845006@qq.com
 * @date: 2020/3/21    7:51 PM
 */
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ScheduledMessage;

import javax.jms.*;

public class JmsProducer_Redelivery {
    public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
    public static final String QUEUE_NAME = "Redelivery";

    public static void main(String[] args) throws JMSException {
        // 创建连接工厂,按照给定的url地址采用默认的用户名和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 通过连接工厂,获取Connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 创建Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建目的地(目的地有两个子接口,分别是Queue和Topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 创建消息生产者,生产的消息放到queue中
        MessageProducer messageProducer = session.createProducer(queue);
        // 使用messageProducer生产消息发送到队列中
        for (int i = 0; i < 3; i++) {
            // 创建一条消息,可以理解成字符串
            TextMessage textMessage = session.createTextMessage("message-" + i);
            messageProducer.send(textMessage);
        }
        // 按照资源打开的相反顺序关闭资源
        messageProducer.close();
        session.close();
        connection.close();
    }
}


package top.activemq.util.queue;

/**
 * description: 死信
 *
 * @author: he QQ:       905845006
 * @email: 905845006@qq.com
 * @date: 2020/3/21    7:51 PM
 */
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;

import javax.jms.*;

public class JmsConsumer_Redelivery {
    public static final String ACTIVEMQ_URL = "tcp://localhost:61616";
    public static final String QUEUE_NAME = "Redelivery";

    public static void main(String[] args) throws JMSException {
        // 创建连接工厂,按照给定的url地址采用默认的用户名和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 通过连接工厂,获取Connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(3);// 修改重发次数为3次
        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);// 将重发策略设置到ConnectionFactory中
        connection.start();
        // 创建Session
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        // 创建目的地(目的地有两个子接口,分别是Queue和Topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 创建消费者,指明从queue取消息
        MessageConsumer messageConsumer = session.createConsumer(queue);
        // 手动修改RedeliveryPolicy(重发策略)
        while (true) {
            // 因为向队列中存放的是TextMessage的实例,所以取出来的时候,也要用TextMessage的实例来接收
            // 这里的receive()方法表示一直等待,如果给它传一个long类型的毫秒数,表示consumer等待超时时间
            TextMessage textMessage = (TextMessage) messageConsumer.receive(1000L);
            if (textMessage != null) {
                System.out.println("消费者消费:" + textMessage.getText());
            } else {
                break;
            }
        }
        // 上面开启了事务,正常情况下,必须执行commit();这里模拟出问题的情况,注释掉commit();
        // session.commit();
        // 按照资源打开的相反顺序关闭资源
        messageConsumer.close();
        session.close();
        connection.close();
    }

}
 来源:锌闻网

标签:textMessage,ActiveMQConnectionFactory,高级,connection,session,使用,import,activemq
来源: https://www.cnblogs.com/vwvwvwgwg/p/12542522.html