其他分享
首页 > 其他分享> > 基于RabbltMQ延迟插件实现延迟队列代码示例

基于RabbltMQ延迟插件实现延迟队列代码示例

作者:互联网

上一篇文章写了docker安装RabbitMQ及延迟插件的安装如果没有安装插件的可以去上一个文章去安装一下,这篇的话是基于RabbitMQ延迟插件实现延迟队列的示例

那么废话不多说 直接上代码!!
首先创建延迟队列配置类 DelayedQueueConfig

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedQueueConfig {
    //交换机
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    //队列
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    //routingkey
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
    //声明交换机
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> arguments = new HashMap<>();
        //设置延迟交换机为直接类型
        arguments.put("x-delayed-type", "direct");
        /**
         * 参数说明
         * 1.交换机名字
         * 2.交换机类型
         * 3.是否持久化
         * 4.是否自动删除
         * 5.其他参数
         */
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
    }
    //声明队列
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }
    //绑定
    @Bean
    public Binding delayedQueueBindingdelayedExchange(
            @Qualifier("delayedQueue") Queue delayedQueue,
            @Qualifier("delayedExchange") CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

接着编写生产者代码

   //基于插件发送延迟消息
    @GetMapping("/senddelayedMsg/{message}/{delayedTime}")
    public void sendMsg(@PathVariable String message, @PathVariable Integer delayedTime) {
        log.info("当前时间:{},发送一条时长{}毫秒给延迟队列delayed.queue:{}", new Date().toString(), delayedTime, message);
        /**
         * convertAndSend 方法参数说明
         * 1.交换机名称
         * 2.routingkey
         * 3.发送的消息
         * 4.发送完消息后的回调
         */
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, msg -> {
            msg.getMessageProperties().setDelay(delayedTime);
            return msg;
        });
    }

最后编写消费者代码

import com.zfd.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class DelayedqueueConsumer {
    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveDelayQueue(Message message) {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到延迟队列的消息:{}", new Date().toString(), msg);
    }
}

接着运行程序 发送请求
image

到这里我们基于插件实现RabbitMQ延迟队列就写完了!!

标签:插件,示例,DELAYED,springframework,org,import,public,延迟
来源: https://www.cnblogs.com/yczgr/p/delayedqueue.html