spring boot和RabbitMQ整合实现
作者:互联网
1.基于API的方式
首先编写测试类:
代码如下:
package com.example.demo;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.example.demo.eight.User;
@RunWith(SpringRunner.class)
@SpringBootTest
public class HeimaApplicationTests {
@Autowired
private AmqpAdmin amqpAdmin;
@Test
public void amqpAdmin() {
// 创建一个fanout类型的交换器
amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));
// 定义两个持久化队列
amqpAdmin.declareQueue(new Queue("fanout_queue_email"));
amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));
// 将队列分别和交换器进行绑定
amqpAdmin.declareBinding(
new Binding("fanout_queue_email", Binding.DestinationType.QUEUE, "fanout_exchange", "", null));
amqpAdmin.declareBinding(
new Binding("fanout_queue_sms", Binding.DestinationType.QUEUE, "fanout_exchange", "", null));
}
}
然后打开RabbitMQ,发现创建成功:最后一个就是
消息发送者发送消息,发送消息借助一个实体类传递消息,所以创建一个实体类:
package com.example.demo.eight;
public class User {
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
private Integer id;
private String username;
@Override
public String toString() {
return “User [id=” + id + “, username=” + username + “]”;
}
}
然后在在测试类进行编写
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void psubPublisher() {
User user = new User();
user.setId(1);
user.setUsername("石头");
rabbitTemplate.convertAndSend("fanout_exchange", "", user);
}
运行时会出现错误,发送实体类对象消息是程序出现异常,需要定制其他类型的异常:
package com.example.demo.eight;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//自定义消息转换器
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
消息消费者接受消息:
package com.example.demo.eight;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class RabbitMQService {
//工作模式接受,处理邮件业务
@RabbitListener(queues = “fanout_queue_email”)
public void psubConsumerEmail(Message message) {
byte[] body=message.getBody();
String s=new String(body);
System.out.println(“邮件业务接受到消息”+s);
}
//监听
@RabbitListener(queues = "fanout_queue_sms")
public void psubConsumeSms(Message message) {
byte[] body=message.getBody();
String s=new String(body);
System.out.println("短信服务接受到消息"+s);
}
}
然后运行主程序
成功实现。
标签:spring,boot,springframework,fanout,RabbitMQ,import,org,new,public 来源: https://blog.csdn.net/qq_46199553/article/details/118706589