rabbitMq延迟消息队列
作者:互联网
1、新建立消息队列配置文件application.properties
#rabbit消息队列的配置
spring.cloud.stream.binders.hua_work_proof.type=rabbit
spring.cloud.stream.binders.hua_work_proof.environment.spring.rabbitmq.host=localhost
spring.cloud.stream.binders.hua_work_proof.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.hua_work_proof.environment.spring.rabbitmq.username=user
spring.cloud.stream.binders.hua_work_proof.environment.spring.rabbitmq.password=123456
#
spring.cloud.stream.bindings.envelope_output.binder=hua_work_proof spring.cloud.stream.bindings.envelope_output.destination=envelope_stream spring.cloud.stream.bindings.envelope_output.consumer.maxAttempts=1 spring.cloud.stream.bindings.envelope_output.consumer.concurrency=5 spring.cloud.stream.bindings.envelope_output.producer.required-groups=envelope_input spring.cloud.stream.rabbit.bindings.envelope_output.producer.delayed-exchange=true //延迟队列的配置 spring.cloud.stream.bindings.envelope_input.binder=hua_work_proof spring.cloud.stream.bindings.envelope_input.destination=envelope_stream spring.cloud.stream.bindings.envelope_input.group=envelope_input spring.cloud.stream.bindings.envelope_input.consumer.maxAttempts=1 spring.cloud.stream.rabbit.bindings.envelope_input.consumer.delayed-exchange=true //延迟队列的配置
2.1发布端接口
package com.alpha.product.service; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; public interface StreamOutput { final String USER_OUTPUT ="user_output"; @Output(USER_OUTPUT ) MessageChannel userOutput(); }
2.2发布端类
package com.alpha.product.service;
import java.time.Duration;
import java.time.LocalDateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
import com.alpha.frame.enhance.module.common.anno.service.CommonService;
@EnableBinding(StreamOutput.class) //绑定发布端
@CommonService
public class StreamOutputService {
private static final Logger log = LoggerFactory.getLogger(StreamOutputService.class);
@Autowired
private StreamOutput streamOutput;
public void send(Long userId,LocalDateTime endTime){
if(userId== null || endTime == null){
return;
}
LocalDateTime now = LocalDateTime.now();
long duration = Duration.between(now,endTime).toMillis();
log.warn("准备发送延迟消息队列 userId:{},now:{} endTime:{} Duration:{}",userId,now,endTime,duration);
streamOutput.envelopeOutput().send(MessageBuilder.withPayload(userId).setHeader("x-delay",duration).build());//延迟队列的发送方式
}
}
2.1消费端接口
package com.alpha.product.stream; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface StreamInput { final String USER_INPUT ="user_input"; @Input(USER_INPUT) SubscribableChannel userInput(); }
2.2消费者
package com.alpha.product.stream;
import java.time.LocalDateTime;
import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
@EnableBinding(StreamInput.class)//绑定消费端
@Component
public class StreamInputService {
private static final Logger log = LoggerFactory.getLogger(StreamInputService.class);
@StreamListener(StreamInput.USER_INPUT) //监听RabbitMq的消息
private void listener(Message<String> message) {
log.warn("now:{} 接受到一条消息队列 content:{}",LocalDateTime.now(),JSONObject.toJSONString(message));
if(message == null){
return;
}
try{
String payload = message.getPayload();
if(!NumberUtils.isParsable(payload)){
log.error("消息内容异常:payload:{}",payload);
return;
}
//业务操作
}catch(Exception e){
e.printStackTrace();
}
}
}
3,测试类
@RunWith(SpringRunner.class) @SpringBootTest(classes = {BrowserDevApplication.class}) public class TestDemo { //当前时间的明天
@Test
public void Test(){
outputService.send(sendRec.getUserId(),LocalDateTime.now().minusDays(-1));
}
}
标签:stream,队列,spring,envelope,rabbitMq,org,import,cloud,延迟 来源: https://www.cnblogs.com/fmhh/p/14463628.html