消息消费者和生产者
作者:互联网
消费者
package org.example;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
/**
* @author yyh
* @create 2021-04-23 20:27
*/
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.43.128:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
System.out.println("我是2号");
//1 创建连接工程
ActiveMQConnectionFactory activeMQConnection = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2 通过连接工程 获得连接connection并启动访问
Connection connection = activeMQConnection.createConnection();
connection.start();
//3创建session会话 2个参数 事务 签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4创建目的地(队列或主题)
Queue queue = session.createQueue(QUEUE_NAME);
//5 创建消息消费者
MessageConsumer consumer = session.createConsumer(queue);
/* //同步阻塞
订阅者或接收者调用messageCosumer的receive方法来接收消息,receive在接收消息之前或者超时之前一直阻塞
while (true) {
TextMessage textMessage = (TextMessage) consumer.receive(4000L);
if (null != textMessage) {
System.out.println("消费者接收消息:" + textMessage.getText());
} else {
break;
}
}*/
//2 通过消息的监听 来消费消息
//异步非阻塞方式
/*订阅者或接收者通过messageConsumer的setMessageListener注册一个消息监听器
当消息到达之后,系统自动调用监听器MessageListener的onMessage(Message message)方法
* */
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(null!=message&&message instanceof TextMessage){
TextMessage textMessage= (TextMessage) message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read(); //保证控制台不灭
//1 先生产 启动1号消费者消费 在启动二号消费者消费 2号还能消费吗
//1号可以消费
//2号不可以消费
//2 先启动2个消费者 在生产
//2.1 两个消费者都有6条
//2.2 先到先消费
//2.3 一人一半 y
consumer.close();
session.close();
connection.close();
}
}
生产者
package org.example;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Hello world!
*
*/
public class JmsProduce
{
public static final String ACTIVEMQ_URL="tcp://192.168.43.128:61616";
public static final String QUEUE_NAME="queue01";
public static void main( String[] args ) throws JMSException {
//1 创建连接工程
ActiveMQConnectionFactory activeMQConnection=new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2 通过连接工程 获得连接connection并启动访问
Connection connection = activeMQConnection.createConnection();
connection.start();
//3创建session会话 2个参数 事务 签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4创建目的地(队列或主题)
Queue queue= session.createQueue(QUEUE_NAME);
//5 创建消息生产者
MessageProducer messageProducer = session.createProducer(queue);
// 6通过消息生产者生产三条消息
for(int i=0;i<3;i++){
//7 创建消息
TextMessage textMessage = session.createTextMessage("msg--" + i);
//8通过messageproducer发送给MQ
messageProducer.send(textMessage);
}
//9关闭资源
messageProducer.close();
session.close();
connection.close();
//消息发布到MQ完成
System.out.println("完成");
}
}
标签:消费者,生产者,connection,session,static,消息,textMessage,public,String 来源: https://blog.csdn.net/qq_43338663/article/details/116099735