其他分享
首页 > 其他分享> > rabbitMq延迟消息队列

rabbitMq延迟消息队列

作者:互联网

 

1、新建立消息队列配置文件application.properties

 #rabbit消息队列的配置
  spring.cloud.stream.binders.hua_work_proof.type=rabbit
 spring.cloud.stream.binders.hua_work_proof.environment.spring.rabbitmq.host=localhost
  spring.cloud.stream.binders.hua_work_proof.environment.spring.rabbitmq.port=5672
  spring.cloud.stream.binders.hua_work_proof.environment.spring.rabbitmq.username=user
  spring.cloud.stream.binders.hua_work_proof.environment.spring.rabbitmq.password=123456

#
spring.cloud.stream.bindings.envelope_output.binder=hua_work_proof spring.cloud.stream.bindings.envelope_output.destination=envelope_stream spring.cloud.stream.bindings.envelope_output.consumer.maxAttempts=1 spring.cloud.stream.bindings.envelope_output.consumer.concurrency=5 spring.cloud.stream.bindings.envelope_output.producer.required-groups=envelope_input spring.cloud.stream.rabbit.bindings.envelope_output.producer.delayed-exchange=true //延迟队列的配置 spring.cloud.stream.bindings.envelope_input.binder=hua_work_proof spring.cloud.stream.bindings.envelope_input.destination=envelope_stream spring.cloud.stream.bindings.envelope_input.group=envelope_input spring.cloud.stream.bindings.envelope_input.consumer.maxAttempts=1 spring.cloud.stream.rabbit.bindings.envelope_input.consumer.delayed-exchange=true //延迟队列的配置

2.1发布端接口

package com.alpha.product.service;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface StreamOutput {
    
    final String USER_OUTPUT  ="user_output";

    @Output(USER_OUTPUT  )
    MessageChannel userOutput();

}

2.2发布端类

package com.alpha.product.service;

import java.time.Duration;
import java.time.LocalDateTime;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;

import com.alpha.frame.enhance.module.common.anno.service.CommonService;


@EnableBinding(StreamOutput.class) //绑定发布端
@CommonService
public class StreamOutputService {

private static final Logger log = LoggerFactory.getLogger(StreamOutputService.class);

  @Autowired
  private StreamOutput streamOutput;

  public void send(Long userId,LocalDateTime endTime){
    if(userId== null || endTime == null){
    return;
  }
  LocalDateTime now = LocalDateTime.now();
  long duration = Duration.between(now,endTime).toMillis();
  log.warn("准备发送延迟消息队列  userId:{},now:{} endTime:{} Duration:{}",userId,now,endTime,duration);
  streamOutput.envelopeOutput().send(MessageBuilder.withPayload(userId).setHeader("x-delay",duration).build());//延迟队列的发送方式
  }
}

 2.1消费端接口

package com.alpha.product.stream;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface StreamInput {
    
    final String USER_INPUT  ="user_input";

    @Input(USER_INPUT)
    SubscribableChannel userInput();

}

2.2消费者

package com.alpha.product.stream;

import java.time.LocalDateTime;

import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;


@EnableBinding(StreamInput.class)//绑定消费端
@Component
public class StreamInputService {
  private static final Logger log = LoggerFactory.getLogger(StreamInputService.class);

  @StreamListener(StreamInput.USER_INPUT) //监听RabbitMq的消息
  private void listener(Message<String> message) {
    log.warn("now:{} 接受到一条消息队列 content:{}",LocalDateTime.now(),JSONObject.toJSONString(message));
    if(message == null){
    return;
  }
  try{
    String payload = message.getPayload();
    if(!NumberUtils.isParsable(payload)){
      log.error("消息内容异常:payload:{}",payload);
      return;
    }
    //业务操作
  }catch(Exception e){
    e.printStackTrace();
  }
 }

}

 3,测试类

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {BrowserDevApplication.class})
public class TestDemo {

    //当前时间的明天
  @Test
  public void Test(){
    outputService.send(sendRec.getUserId(),LocalDateTime.now().minusDays(-1));
  }
}

 

标签:stream,队列,spring,envelope,rabbitMq,org,import,cloud,延迟
来源: https://www.cnblogs.com/fmhh/p/14463628.html