springboot学习20
作者:互联网
异步消息-RabbitMQ及AMQP
RabbitMQ是 AMQP实现,AMQP 消息使用交换器的名称和路由键来寻址。
几种交换方式:
Default | Direct | Topic | Fanout | Headers | Dead letter |
一、RabbitTemplate发送消息
1、添加 RabbitMQ 到 Spring 中
pom.xml引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在application.yml或者application.properties中配置 RabbitMQ属性:
属性 | 描述 |
---|---|
spring.rabbitmq.addresses | 一个逗号分隔的 RabbitMQ Broker 地址列表 |
spring.rabbitmq.host | Broker 主机(默认为 localhost) |
spring.rabbitmq.port | Broker 端口(默认为 5672) |
spring.rabbitmq.username | 访问 Broker 的用户名(可选) |
spring.rabbitmq.password | 访问 Broker 的密码(可选) |
2、 RabbitTemplate 发送消息
主要方法:
// 发送原始消息
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
// 发送从对象转换过来的消息
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
// 发送经过处理后从对象转换过来的消息,接受一个 MessagePostProcessor,
// 可以在消息对象发送到代理之前使用它来操作消息对象
void convertAndSend(Object message, MessagePostProcessor mPP) throws AmqpException;
void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
1)使用RabbitTemplate.send() 发送消息,如:
package demo.springbootlearn.service.messaging;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import demo.springbootlearn.Order;
@Service
public class RabbitOrderMessagingService {
private RabbitTemplate rabbitTemplate;
@Autowired
public RabbitOrderMessagingService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
// sendOrder方法1
public void sendOrder(Order order) {
MessageConverter converter = rabbitTemplate.getMessageConverter();
MessageProperties props = new MessageProperties();
Message message = converter.toMessage(order, props);
// 指定了路由键demospringbootlearn.order。缺省了交换键。
// 默认交换,默认交换名称是 ""(一个空 String),默认的路由键也是 ""(一个空 String)
// 可以在application.yml和application.properties。设置 spring.rabbitmq.template.exchange 和 spring.rabbitmq.template.routing-key 属性来覆盖这些缺省值
rabbitTemplate.send("demospringbootlearn.order", message);
}
}
spring:
rabbitmq:
template:
exchange: demospringbootlearn.orders
routing-key: demospringbootlearn.shop.central
如上面代码的 sendOrder方法1,可以改成下面这样,从消息转换器创建消息对象:
// sendOrder方法2
public void sendOrder(Order order) {
rabbitTemplate.convertAndSend("demospringbootlearn.order", order);
}
2)配置消息转换器
使用SimpleMessageConverter 执行消息转换。可以将简单类型(如String)和可序列化对象转换为消息对象。
消息转换器 | 描述 |
---|---|
Jackson2JsonMessageConverter | 使用Jackson 2 JSON处理器将对象与 JSON 进行转换 |
MarshallingMessageConverter | 使用 Spring 的序列化和反序列化抽象转换 String 和任何类型的本地对象 |
SimpleMessageConverter | 转换 String、字节数组和序列化类型 |
ContentTypeDelegatingMessageConverter | 基于 contentType 头信息将对象委托给另一个 MessageConverter |
MessagingMessageConverter | 将消息转换委托给底层 MessageConverter,将消息头委托给 AmqpHeaderConverter |
3)设置消息属性
// sendOrder方法3
public void sendOrder(Order order) {
MessageConverter converter = rabbitTemplate.getMessageConverter();
MessageProperties props = new MessageProperties();
// 设置消息头
props.setHeader("X_ORDER_SOURCE", "WEB");
Message message = converter.toMessage(order, props);
rabbitTemplate.send("demospringbootlearn.order", message);
}
使用MessagePostProcessor:
// sendOrder方法4:
@Override
public void sendOrder(Order order) {
rabbit.convertAndSend("demospringbootlearn.order.queue", order,
// 匿名内部类
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message)
throws AmqpException {
// 从消息中获取 MessageProperties
MessageProperties props = message.getMessageProperties();
// 设置消息头
props.setHeader("X_ORDER_SOURCE", "WEB");
return message;
}
});
}
二、RabbitMQ接收消息
1)接收消息:
RabbitTemplate主要方法:
// 接收消息,从队列接收原始 Message 对象
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
// long 参数来表示接收消息的超时。默认情况下,接收超时为 0 毫秒。
// 对 receive() 的调用将立即返回,如果没有可用的消息,则可能返回空值。
Message receive(long timeoutMillis) throws AmqpException;
// 传入超时值,可以让 receive()方法阻塞,直到消息到达或超时过期。
// 使用非零超时,也要处理返回的 null 值。
Message receive(String queueName, long timeoutMillis) throws AmqpException;
// 接收从消息转换过来的对象,返回消息之前使用消息转换器将其转换为域对象。
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
// 传入超时值,可以让 receiveAndConvert()方法阻塞,直到消息到达或超时过期。
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
// 接收从消息转换过来的类型安全的对象
<T> T receiveAndConvert(ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(String queueName, long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;
RabbitOrderReceiver
:
package demo.springbootlearn.messaging.rabbit;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitOrderReceiver {
private RabbitTemplate rabbitTemplate;
private MessageConverter converter;
@Autowired
public RabbitOrderReceiver(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.converter = rabbitTemplate.getMessageConverter();
}
// receiveOrder方法1
public Order receiveOrder() {
// receive方法不提供超时值,因此只能假设调用立即返回 Message 或 null。
Message message = rabbitTemplate.receive("demospringbootlearn.order");
return message != null
? (Order) converter.fromMessage(message)
: null;
}
// receiveOrder方法2
// 将 receiveOrder() 方法更改为传递一个 60,000 毫秒的延迟后再调用 receive()
public Order receiveOrder() {
Message message = rabbitTemplate.receive("demospringbootlearn.order.queue", 60000);
return message != null
? (Order) converter.fromMessage(message)
: null;
}
}
上面RabbitOrderReceiver代码中,60000毫秒是硬编码。可以写在配置文件中在application.properties
或者application.yml
,使用 spring.rabbitmq.template.receive-timeout 属性设置它。 然后使用@ConfigurationProperties
注解:
如:
application.yml
:
spring:
rabbitmq:
template:
receive-timeout: 60000
或者:
application.properties
:
spring.rabbitmq.template.receive-timeout=60000
使用 receiveAndConvert(),重写上面的receiveOrder方法3:
// receiveOrder方法3
public Oreder receiveOrder() {
return (Order) rabbitTemplate.receiveAndConvert("demospringbootlearn.order.queue");
}
方法3还有Object转order类型,可以用ParameterizedTypeReference来接收对象,如:
// receiveOrder方法4
public Order receiveOrder() {
return rabbitTemplate.receiveAndConvert("demospringbootlearn.order.queue",
new ParameterizedTypeReference<Order>() {});
}
注:使用 receiveAndConvert() 的 ParameterizedTypeReference 的惟一要求是消息转换器必须是 SmartMessageConverter 的实现。
2)使用监听器处理 RabbitMQ 消息
Spring 提供了 RabbitListener
,在bean方法上使用@RabbitTemplate
进行注解 。
package demo.springbootlearn.messaging.rabbit.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class OrderListener {
// ...
@RabbitListener(queues = "demospringbootlearn.order.queue")
public void receiveOrder(Order order) {
//...
}
}
标签:rabbitTemplate,20,springboot,order,学习,AmqpException,message,throws,String 来源: https://blog.csdn.net/tongwudi5093/article/details/113803428