编程语言
首页 > 编程语言> > java中使用rabbitmq以及遇到的问题

java中使用rabbitmq以及遇到的问题

作者:互联网

现在是北京时间2022/09/14/17:21,天气渐微凉,浅聊一下java中如何使用rabbitmq,

估计能看到这里,想必你肯定翻阅了很多博客了,那么废话不多说,上代码

那么,首先,我们需要在pom.xml文件中导入相关依赖,笔者这里使用的springboot,各位可以按需导入

        <!--    提供大量的自动注册功能    -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
    
        <!--  java项目启动依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <!--  rabbitmq依赖  -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

刷新maven即可,接下来我们在application.properties文件中进行配置rabbitmq

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port= 5672
spring.rabbitmq.username= guest
spring.rabbitmq.password= guest
spring.rabbitmq.virtual-host= /


spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=3
spring.rabbitmq.listener.simple.retry.initial-interval=2000

# ??????
spring.rabbitmq.listener.simple.acknowledge-mode= manual
spring.rabbitmq.listener.direct.acknowledge-mode= manual
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirm-type = correlated

这里看不懂没关系啊,接下来会慢慢说明的

然后我们来建立config文件夹,并在其下创建DirectRabbitConfig.java文件哈,用于创建队列,交换机和声明路由键

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DirectRabbitConfig {
    
    //队列 起名:websocketDirectQueue
    @SuppressWarnings("checkstyle:MethodName")
    @Bean
    public Queue test1queue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
     // ttl:消息的存活时间,以毫秒为单位 // 队列中消息的过期时间 Map<String, Object> args2 = new HashMap<>(); args2.put("x-message-ttl", 60000); args2.put("x-dead-letter-exchange", "test1deadLetterDirectExchange"); args2.put("x-dead-letter-routing-key", "test1deadLetterDirectRouting"); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue("test1queue", true, false, false, args2); } //Direct交换机 起名:websocketDirectExchange @SuppressWarnings("checkstyle:MethodName") @Bean DirectExchange test1DirectExchange() { return new DirectExchange("test1DirectExchange", true, false); } //绑定 将队列和交换机绑定, 并设置用于匹配键:websocketDirectRouting @Bean Binding test1DirectRouting() { return BindingBuilder.bind(test1queue()).to(test1DirectExchange()).with("test1DirectRouting"); } @Bean public Jackson2JsonMessageConverter producerJackson2MessageConverter() { return new Jackson2JsonMessageConverter(); } @Bean public MappingJackson2MessageConverter consumerJackson2MessageConverter() { return new MappingJackson2MessageConverter(); } }

这里边可以看到都有相应的注释哈,就不多废话了,接下来我们来定义一个生产者:testController

这边我是用的是接口,各位也可以使用测试类来进行

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

@RestController
@RequestMapping("/rabbitmq")
public class testController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @GetMapping("/test")
    public void t(){
        for(int i = 0;i<10;i++){
            Map<String,Object> map = new HashMap<>();
            map.put("msg","这是第" + i + "个消息");
            map.put("data", Arrays.asList("helloworld",123,true));
            // 第一个参数:交换机名称,第二个参数:路由键名称,第三个参数:发送的数据
            rabbitTemplate.convertAndSend("test1DirectExchange","est1DirectRouting",map.toString());
        }
    }
}

创建ReceiveHandler.java文件来充当消费者

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;


@Component
public class ReceiveHandler {
    
    int count = 0;
    
    
    // 使用RabbitListener绑定我们在config中声明的队列
    @RabbitListener(queues = "MyTestQueue")
    public void send_email(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
            throws Exception {
        
        String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
        
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        
        try {
            
            System.out.println("receive message is:" + new String(message.getBody(), StandardCharsets.UTF_8));
            
        } catch (Exception ex) {
            
            System.out.println("重试次数" + count++);
            
            channel.basicAck(tag, false);
            
        }
    }
}

接下来我们调用接口,可以看到控制台信息

 

 可以看到,消费者成功消费了本次循环的信息

这就算是完成简单的使用了

在开发中,仅仅写成这样对程序的需求是远远不够的

我们还需要考虑消息的安全性以及幂等性

1、消息一定会到达交换机吗?

2、消息到达交换价之后,一定会到达队列吗?

3、如何保证消息的幂等性

等许多问题,时间不够用了,后续留给下次

标签:java,遇到,spring,springframework,rabbitmq,org,import
来源: https://www.cnblogs.com/gengjiangtao/p/16693844.html