springboot整合RocketMQ
作者:互联网
导入依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.1</version> </dependency>
/** * 生产者 同步生产消息 普通队列 * @throws Exception */ @Test void provider() throws Exception { // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("orderGroup1"); // 设置NameServer的地址 producer.setNamesrvAddr("192.168.1.137:9876"); // 启动Producer实例 producer.start(); for (int i = 0; i < 100; i++) { // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 发送消息到一个Broker SendResult sendResult = producer.send(msg); // 通过sendResult返回消息是否成功送达 System.out.printf("%s%n", sendResult); } // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } /** * 消费者 * @throws Exception */ @Test void consumer() throws Exception{ // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderGroup1"); // 设置NameServer的地址 consumer.setNamesrvAddr("192.168.1.137:9876"); // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息 consumer.subscribe("TopicTest", "*");//接受所有的TopicTest里的内容 // 注册回调实现类来处理从broker拉取回来的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者实例 consumer.start(); System.out.printf("Consumer Started.%n"); }
延时队列
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。 broker有配置项messageDelayLevel,
默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。
注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况: level == 0,消息为非延迟消息 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s level > maxLevel,则level== maxLevel,例如level==20,延迟2h 定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,
即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。 需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
/** * 消费延时队列 */ @Test public void consumerDelayMSG() throws Exception{ // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer"); // 设置NameServer的地址 consumer.setNamesrvAddr("192.168.1.137:9876"); // 订阅Topics consumer.subscribe("TestTopicDelay", "*"); // 注册消息监听者 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // Print approximate delay time period System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start(); Thread.sleep(Integer.MAX_VALUE); } @Test public void producerDelayMSG() throws Exception{ // 实例化一个生产者来产生延时消息 DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup"); // 设置NameServer的地址 producer.setNamesrvAddr("192.168.1.137:9876"); // 启动生产者 producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("TestTopicDelay", ("Hello scheduled message " + i).getBytes()); // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel) message.setDelayTimeLevel(4); // 发送消息 producer.send(message); } // 关闭生产者 producer.shutdown(); }
springboot整合rocketMQ
导入依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.2</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.1</version> </dependency>
定义生产者和消费者
@Component public class OrderMsgProducer { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 生产者生产消息 * @param orderSN */ public void sendOrderMessage(String orderSN){ rocketMQTemplate.syncSend(OrderConstant.ROCKETMQ_ORDER_TOPIC, MessageBuilder.withPayload(orderSN).build(), 5000,4); } }
@RocketMQMessageListener(topic = OrderConstant.ROCKETMQ_ORDER_TOPIC,consumerGroup = "${rocketmq.consumer.group}") @Component public class OrderMsgConsumer implements RocketMQListener<String> { /** * 消费者订阅topic 定义消费组 实现监听 * @param s */ @Override public void onMessage(String s) { System.out.println("收到的消息:"+s); } }
标签:springboot,producer,消息,整合,new,message,consumer,public,RocketMQ 来源: https://www.cnblogs.com/Lcch/p/16398600.html