ActiveMQ学习笔记(十八)Message Dispatch高级特性一
作者:互联网
一、Message Cursors
这里的批次,就好比是分页。
二、Async Sends
三、Dispatch Policies
3.1 严格顺序分发策略
测试:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class NonPersistenceSender {
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("MyTopic");
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 3; i++) {
TextMessage message = session.createTextMessage("message111--" + i);
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
}
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class NoPerReceiver2 {
public static void main(String[] args) throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("MyTopic");
for (int i = 0; i < 2; i++) {
final MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage txtMsg = (TextMessage) message;
try {
System.out.println(consumer + "收到消息:" + txtMsg.getText());
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
}
先启动消费者,后启动生产者。
可以看出,多个消费者消费同一个生产者的消息时,是按照顺序消费的。
修改一下消费者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class NoPerReceiver2 {
public static void main(String[] args) throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
for (int i = 0; i < 2; i++) {
Thread t = new MyT(factory);
t.start();
}
}
}
class MyT extends Thread {
private ConnectionFactory connectionFactory = null;
public MyT(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
@Override
public void run() {
try {
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("MyTopic");
for (int i = 0; i < 2; i++) {
final MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage txtMsg = (TextMessage) message;
try {
System.out.println(consumer + "收到消息:" + txtMsg.getText());
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
先运行消费者,后运行生产者
这就演示了不同的topic consumer可能会以不同的顺序接收来自不同producer的消息。
3.2 轮询分发策略
————————————————
版权声明:本文为CSDN博主「csdn_kenneth」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/csdn_kenneth/article/details/83044835
标签:connectionFactory,ActiveMQConnectionFactory,consumer,Dispatch,connection,session 来源: https://blog.csdn.net/daiyuenlong110/article/details/115181212