java中使用rabbitmq以及遇到的问题
作者:互联网
现在是北京时间2022/09/14/17:21,天气渐微凉,浅聊一下java中如何使用rabbitmq,
估计能看到这里,想必你肯定翻阅了很多博客了,那么废话不多说,上代码
那么,首先,我们需要在pom.xml文件中导入相关依赖,笔者这里使用的springboot,各位可以按需导入
<!-- 提供大量的自动注册功能 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> <!-- java项目启动依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- rabbitmq依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
刷新maven即可,接下来我们在application.properties文件中进行配置rabbitmq
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port= 5672 spring.rabbitmq.username= guest spring.rabbitmq.password= guest spring.rabbitmq.virtual-host= / spring.rabbitmq.listener.simple.retry.enabled=true spring.rabbitmq.listener.simple.retry.max-attempts=3 spring.rabbitmq.listener.simple.retry.initial-interval=2000 # ?????? spring.rabbitmq.listener.simple.acknowledge-mode= manual spring.rabbitmq.listener.direct.acknowledge-mode= manual spring.rabbitmq.publisher-returns=true spring.rabbitmq.publisher-confirm-type = correlated
这里看不懂没关系啊,接下来会慢慢说明的
然后我们来建立config文件夹,并在其下创建DirectRabbitConfig.java文件哈,用于创建队列,交换机和声明路由键
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import java.util.HashMap; import java.util.Map; @Configuration public class DirectRabbitConfig { //队列 起名:websocketDirectQueue @SuppressWarnings("checkstyle:MethodName") @Bean public Queue test1queue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// ttl:消息的存活时间,以毫秒为单位 // 队列中消息的过期时间 Map<String, Object> args2 = new HashMap<>(); args2.put("x-message-ttl", 60000); args2.put("x-dead-letter-exchange", "test1deadLetterDirectExchange"); args2.put("x-dead-letter-routing-key", "test1deadLetterDirectRouting"); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue("test1queue", true, false, false, args2); } //Direct交换机 起名:websocketDirectExchange @SuppressWarnings("checkstyle:MethodName") @Bean DirectExchange test1DirectExchange() { return new DirectExchange("test1DirectExchange", true, false); } //绑定 将队列和交换机绑定, 并设置用于匹配键:websocketDirectRouting @Bean Binding test1DirectRouting() { return BindingBuilder.bind(test1queue()).to(test1DirectExchange()).with("test1DirectRouting"); } @Bean public Jackson2JsonMessageConverter producerJackson2MessageConverter() { return new Jackson2JsonMessageConverter(); } @Bean public MappingJackson2MessageConverter consumerJackson2MessageConverter() { return new MappingJackson2MessageConverter(); } }
这里边可以看到都有相应的注释哈,就不多废话了,接下来我们来定义一个生产者:testController
这边我是用的是接口,各位也可以使用测试类来进行
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @RestController @RequestMapping("/rabbitmq") public class testController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/test") public void t(){ for(int i = 0;i<10;i++){ Map<String,Object> map = new HashMap<>(); map.put("msg","这是第" + i + "个消息"); map.put("data", Arrays.asList("helloworld",123,true)); // 第一个参数:交换机名称,第二个参数:路由键名称,第三个参数:发送的数据 rabbitTemplate.convertAndSend("test1DirectExchange","est1DirectRouting",map.toString()); } } }
创建ReceiveHandler.java文件来充当消费者
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; @Component public class ReceiveHandler { int count = 0; // 使用RabbitListener绑定我们在config中声明的队列 @RabbitListener(queues = "MyTestQueue") public void send_email(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); try { System.out.println("receive message is:" + new String(message.getBody(), StandardCharsets.UTF_8)); } catch (Exception ex) { System.out.println("重试次数" + count++); channel.basicAck(tag, false); } } }
接下来我们调用接口,可以看到控制台信息
可以看到,消费者成功消费了本次循环的信息
这就算是完成简单的使用了
在开发中,仅仅写成这样对程序的需求是远远不够的
我们还需要考虑消息的安全性以及幂等性
1、消息一定会到达交换机吗?
2、消息到达交换价之后,一定会到达队列吗?
3、如何保证消息的幂等性
等许多问题,时间不够用了,后续留给下次
标签:java,遇到,spring,springframework,rabbitmq,org,import 来源: https://www.cnblogs.com/gengjiangtao/p/16693844.html