其他分享
首页 > 其他分享> > 谷粒商城高级篇—RabbitMQ

谷粒商城高级篇—RabbitMQ

作者:互联网

延时队列(实现定时任务)

未付款订单,超时自动取消并释放占有。

常用解决方案:定时任务轮询

缺点:消耗内存,增加数据库压力,时间误差大

解决:RabbitMQ 消息TTL和死信Exchange结合

控制消息在一段时间变成死信,控制死信的消息被路由到某个指定交换机,实验延时队列。

延时队列实现-1

延时队列实现-2

推荐方式一给队列设置过期时间,因为RabbitMQ是惰性检查。

SpringBoot整合RabbitMQ

  1. 引入依赖

    <!--RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  2. 自动配置了 RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate

    #配置RabbitMQ
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.virtual-host=/
    

    测试 amqpAdmin、rabbitTemplate

    @Autowired
    AmqpAdmin amqpAdmin;
    
    @Autowired
    RabbitTemplate rabbitTemplate;
    
    /* *
    * 如何创建Exchange、Queue、Binding
    * 如何收发消息
    */
    @Test
    public void createExchange() {
        //Exchange
        amqpAdmin.declareExchange(new DirectExchange("hello-java-exchange",true,false));
        log.info("Exchange[{}]创建成功","hello-java-exchange");
    }
    @Test
    public void createQueue(){
        amqpAdmin.declareQueue(new Queue("hello-java-queue",true,false,false));
        log.info("Queue[{}]创建成功","hello-java-queue");
    }
    @Test
    public void createBinding(){
        amqpAdmin.declareBinding(new Binding("hello-java-queue",
                                             Binding.DestinationType.QUEUE,
                                             "hello-java-exchange",
                                             "hello.java",
                                             null));
        log.info("Binding[{}]创建成功","hello-java-binding");
    }
    @Test
    public void sendMessageTest(){
        OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
        reasonEntity.setId(1L);
        reasonEntity.setCreateTime(new Date());
        reasonEntity.setName("哈哈");
        // 如果发送消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable
        String msg = "hello world!";
        rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",reasonEntity);
        log.info("消息发送完成{}",reasonEntity);
    }
    
  3. @EnableRabbit

  4. 监听消息 @RabbitListener

     @RabbitListener(queues = {"hello-java-queue"})
        public void recieverMessage(Message message,
                                    OrderReturnReasonEntity content,
                                    Channel channel){
            //消息体
            byte[] body = message.getBody();
            //消息头
            MessageProperties messageProperties = message.getMessageProperties();
            System.out.println("接收到消息。。。内容:"+message+"==>类型:"+message.getClass());
        }
    
    • @RabbitListener 类+方法上
    • @RabbitHandler 标在方法上(重载区分不同的消息)

Queue可以很多人监听,同一个消息只能一个客户端收到

只要一个消息完全处理完,方法运行结束,可以接收下一个消息

RabbitMQ消息确认机制-可靠抵达

保证可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制。

延时队列关闭订单模拟

  1. 创建Exchange Queue Binding

    @Configuration
    public class MyMQConfig {
    
        //死信队列
        @Bean
        public Queue orderDelayQueue(){
            /*
                    Queue(String name,  队列名字
                    boolean durable,  是否持久化
                    boolean exclusive,  是否排他
                    boolean autoDelete, 是否自动删除
                    Map<String, Object> arguments) 属性
                 */
            HashMap<String, Object> arguments = new HashMap<>();
            arguments.put("x-dead-letter-exchange", "order-event-exchange");
            arguments.put("x-dead-letter-routing-key", "order.release.order");
            arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
            Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
    
            return queue;
        }
    
        //普通队列
        @Bean
        public Queue orderReleaseOrderQueue(){
            Queue queue = new Queue("order.release.order.queue", true, false, false);
    
            return queue;
        }
    
        //TopicExchange
        @Bean
        public Exchange orderEventExchange(){
            /*
                 *   String name,
                 *   boolean durable,
                 *   boolean autoDelete,
                 *   Map<String, Object> arguments
                 * */
            return new TopicExchange("order-event-exchange", true, false);
    
        }
    
        @Bean
        public Binding orderCreateOrderBinding(){
            /*
                 * String destination, 目的地(队列名或者交换机名字)
                 * DestinationType destinationType, 目的地类型(Queue、Exhcange)
                 * String exchange,
                 * String routingKey,
                 * Map<String, Object> arguments
                 * */
            return new Binding("order.delay.queue",
                               Binding.DestinationType.QUEUE,
                               "order-event-exchange",
                               "order.create.order",
                               null);
    
        }
    
        @Bean
        public Binding orderReleaseOrderBinding(){
            return new Binding("order.release.order.queue",
                               Binding.DestinationType.QUEUE,
                               "order-event-exchange",
                               "order.release.order",
                               null);
        }
    
    }
    

    创建订单

    @GetMapping("/test/createOrder")
    public String creatOrderTest(){
        OrderEntity entity = new OrderEntity();
        entity.setOrderSn(UUID.randomUUID().toString());
        entity.setModifyTime(new Date());
    
        rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",entity);
        return "ok";
    }
    

    消费者,接收过期订单

    @RabbitListener(queues = "order.release.order.queue")
    public void listener(OrderEntity entity, Channel channel, Message message) throws IOException {
        System.out.println("收到过期的订单信息:准备关闭订单"+entity.getOrderSn());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
    

如何保证消息可靠性

丢失

重复

积压

标签:exchange,RabbitMQ,order,queue,消息,new,public,商城,谷粒
来源: https://www.cnblogs.com/alvin103/p/16057092.html