其他分享
首页 > 其他分享> > RabbitMQ - mandatory参数

RabbitMQ - mandatory参数

作者:互联网

参考:<<RabbitMQ实战指南>>

mandatory和immediate是channel.basicPublish方法中的两个参数,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。

RabbitMQ 3.0版本开始去掉了对immediate参数的支持,这里就不在讨论该参数。

1. 发送消息api:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

参数说明:

2.获取没有被正确路由的消息

mandatory设置为true之后,生产者通过调用channel.addReturnListener()方法来添加ReturnListener监听器,实现获取没有被正确路由到合适队列的消息。有以下几种情形:

注:如果是无法路由到交换器上,则不会触发Basic.Return命令,也就是监听器不会接收到无法路由的消息。

3.示例

3.1 原生api

// 获取Connection、创建Channel步骤略
// 声明交换器
String exchangeName = "direct.exchange.test.mandatory";
AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclare(exchangeName, "direct", true);
// 声明队列
String queueName = "direct.queue.test.mandatory";
Map<String, Object> arguments = new HashMap<>();
String queue = channel.queueDeclare(queueName, true, false, false, null).getQueue();
// 绑定交换器和队列
String routingKey = "direct.routing-key.test.mandatory";
channel.queueBind(queue, exchangeName, routingKey);

// 正常路由的消息
channel.basicPublish(exchangeName, routingKey, true, MessageProperties.TEXT_PLAIN, "Test Msg".getBytes("UTF-8"));

// 不可路由的消息
channel.basicPublish(exchangeName, routingKey + "2", true, MessageProperties.TEXT_PLAIN, "Test Msg2".getBytes("UTF-8"));

channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(message);
    }
});

3.2 springboot

(1)添加rabbitmq的starter

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

(2)application.yml
通过spring.rabbitmq.publisher-returns=true属性以及实现org.springframework.amqp.rabbit.core.RabbitTemplate.RabbitTemplate.ReturnCallback接口来接收无法路由的消息。

实际上,最终还是利用channel.basicPublish()方法,将mandatory设置为true来实现。

spring:
  rabbitmq:
    host: dev.tss.com
    port: 5672
    username: admin
    password: njittss
    # 开启发送确认
    # publisher-confirms: true
    # 开启发送失败退回,或者通过rabbitTemplate.setMandatory(true);设置
    publisher-returns: true

rabbitmq:
  direct:
    test:
      mandatory:
        exchangeName: direct.exchange.test.mandatory
        queueName: direct.queue.test.mandatory
        routingKey: direct.routing-key.test.mandatory

(3)添加监听器

@Component
public class RabbitCallback implements RabbitTemplate.ReturnCallback {
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitCallback.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        // 也可以通过这种方式配置
        // rabbitTemplate.setMandatory(true);
        
        // 每个RabbitTemplate只能设置一个RabbitTemplate.ReturnCallback
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 交换机路由到队列失败才会回调
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        LOGGER.warn("return callback, receive message :" + message.toString() + ", " + replyText + ", " + exchange + ", " + routingKey);
    }
}

(4)发送消息测试
当调用sendAbnormalMessage()方法发送消息时,监听器会收到无法路由到队列的消息。

@Value("${rabbitmq.direct.test.mandatory.exchangeName}")
private String exchangeName;
@Value("${rabbitmq.direct.test.mandatory.routingKey}")
private String routingKey;

@Autowired
private RabbitTemplate rabbitTemplate;

// 测试发送可以路由的消息
public boolean sendNormalMessage() {
    String message = "test normal message";
    this.rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
    return true;
}

// 测试发送不可路由的消息
public boolean sendAbnormalMessage() {
	String message = "test abnormal message";
	this.rabbitTemplate.convertAndSend(exchangeName, routingKey + "2", message);
	return true;
}

最后扒一下spring发送消息设置mandatory逻辑:

public void send(final String exchange, final String routingKey, final Message message, final CorrelationData correlationData) throws AmqpException {
	execute(new ChannelCallback<Object>() {
		@Override
		public Object doInRabbit(Channel channel) throws Exception {
			doSend(channel, exchange, routingKey, message, 
				RabbitTemplate.this.returnCallback != null && 				RabbitTemplate.this.mandatoryExpression.getValue(RabbitTemplate.this.evaluationContext, message, Boolean.class), 
				correlationData);
			return null;
		}
	}, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
}

protected void doSend(Channel channel, String exchange, String routingKey, Message message,
			boolean mandatory, CorrelationData correlationData) throws Exception {
    // ...
	Message messageToUse = message;
	MessageProperties messageProperties = messageToUse.getMessageProperties();
	if (mandatory) {
		messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_CORRELATION_KEY, this.uuid);
	}
	// ...
	// 发送消息
	channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, messageToUse.getBody());
	// ...
}

标签:mandatory,String,RabbitMQ,参数,message,true,路由,routingKey
来源: https://blog.csdn.net/mytt_10566/article/details/90741398