其他分享
首页 > 其他分享> > 消息中间件RabbitMQ(五)——实现RPC调用

消息中间件RabbitMQ(五)——实现RPC调用

作者:互联网

文章目录

1. RPC

对于微服务开发者,对于 RPCRemote Procedure Call Protocol 远程过程调用协议)并不会陌生吧, RESTful APIDubboWebService等都是RPC的实现调用

RabbitMQ中也提供了 RPC 功能,并且使用起来很简单,下面就来学习一下

2. 实现原理

再来熟悉下原理图

在这里插入图片描述

上图把RPC的过程描述的很清楚:

3. 代码实现

3.1 客户端实现

客户端配置文件:application.properties

server.port=8889
spring.rabbitmq.host=192.168.3.157
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
# 开启消息确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true

spring.rabbitmq.publisher-confirm-type=correlated这项配置作用是:通过 correlated来确认消息。

只有开启了这个配置,将来的消息中才会带 correlation_id,只有通过 correlation_id才能将发送的消息和返回值之间关联起来

客户端配置类:

package com.scorpios.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RPCRabbitMQConfig {


    // 交换机的名称
    public static final String SCORPIOS_RPC_EXCHANGE_NAME = "scorpios_rpc_exchange_name";


    // 发送队列名称
    public static final String SCORPIOS_RPC_MSG_QUEUE = "scorpios_rpc_msg_queue";

    // 返回队列名称
    public static final String SCORPIOS_RPC_REPLY_QUEUE = "scorpios_rpc_reply_queue";

    @Bean
    TopicExchange topicExchange(){
        return new TopicExchange(RPCRabbitMQConfig.SCORPIOS_RPC_EXCHANGE_NAME,true,false);
    }

    @Bean
    Queue queueOne() {
        return new Queue(RPCRabbitMQConfig.SCORPIOS_RPC_MSG_QUEUE,true,false,false);
    }

    @Bean
    Queue queueTwo() {
        return new Queue(RPCRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE,true,false,false);
    }


    /**
     * 请求队列和交换器绑定
     */
    @Bean
    Binding bindingMsg(){
        return BindingBuilder.bind(queueOne()).to(topicExchange()).with(RPCRabbitMQConfig.SCORPIOS_RPC_MSG_QUEUE);
    }

    /**
     * 返回队列和交换器绑定
     */
    @Bean
    Binding bindingReply(){
        return BindingBuilder.bind(queueTwo()).to(topicExchange()).with(RPCRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE);
    }

    /**
     * 自定义 RabbitTemplate发送和接收消息,因为要设置回调队列地址
     */
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
       RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
       rabbitTemplate.setReplyAddress(RPCRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE);
       rabbitTemplate.setReplyTimeout(5000);
       return rabbitTemplate;
    }

    /**
     * 给返回队列设置监听器
     */
    @Bean
    SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(RPCRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE);
        container.setMessageListener(rabbitTemplate(connectionFactory));
        return container;
    }

}

上面代码解释说明:

下面来编写消息发送代码:

@Slf4j
@RestController
public class RabbitMQController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send/message")
    public String send(String message) {

        // 创建消息对象
        Message newMessage = MessageBuilder.withBody(message.getBytes()).build();

        log.info("Client 发送的消息为:{}", newMessage);

        // 客户端给消息队列发送消息,并返回响应结果
        Message result = rabbitTemplate.sendAndReceive(RPCRabbitMQConfig.SCORPIOS_RPC_EXCHANGE_NAME, RPCRabbitMQConfig.SCORPIOS_RPC_MSG_QUEUE, newMessage);

        String response = "";
        if (result != null) {
            // 获取已发送的消息的 correlationId
            String correlationId = newMessage.getMessageProperties().getCorrelationId();
            log.info("发送消息的correlationId为:{}", correlationId);

            // 获取响应头信息
            HashMap<String, Object> headers = (HashMap<String, Object>) result.getMessageProperties().getHeaders();

            // 获取 server 返回的消息 correlationId
            String msgId = (String) headers.get("spring_returned_message_correlation");

            // 将已发送的消息的 correlationId与server返回的消息 correlationId进行对比,相同则取出响应结果
            if (msgId.equals(correlationId)) {
                response = new String(result.getBody());
                log.info("client 收到的响应结果为:{}", response);
            }
        }
        return response;
    }

}

解释说明:

注意:如果没有在 application.properties 中配置 correlated,发送的消息中就没有 correlation_id,这样就无法将返回的消息内容和发送的消息内容关联起来

3.2 服务端实现

服务端配置文件 application.properties与客户端中的配置文件一致

服务端配置类:

@Configuration
public class RPCServerRabbitMQConfig {


    // 交换机的名称
    public static final String SCORPIOS_RPC_EXCHANGE_NAME = "scorpios_rpc_exchange_name";


    // 发送队列名称
    public static final String SCORPIOS_RPC_MSG_QUEUE = "scorpios_rpc_msg_queue";

    // 返回队列名称
    public static final String SCORPIOS_RPC_REPLY_QUEUE = "scorpios_rpc_reply_queue";

    @Bean
    TopicExchange topicExchange(){
        return new TopicExchange(RPCServerRabbitMQConfig.SCORPIOS_RPC_EXCHANGE_NAME,true,false);
    }

    @Bean
    Queue queueOne() {
        return new Queue(RPCServerRabbitMQConfig.SCORPIOS_RPC_MSG_QUEUE,true,false,false);
    }

    @Bean
    Queue queueTwo() {
        return new Queue(RPCServerRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE,true,false,false);
    }


    @Bean
    Binding bindingMsg(){
        return BindingBuilder.bind(queueOne()).to(topicExchange()).with(RPCServerRabbitMQConfig.SCORPIOS_RPC_MSG_QUEUE);
    }

    @Bean
    Binding bindingReply(){
        return BindingBuilder.bind(queueTwo()).to(topicExchange()).with(RPCServerRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE);
    }

}

最后我们再来看下消息的消费:

@Slf4j
@Component
public class RpcServerConsumer {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    // 此消费者消费msgQueue队列中的消息
    @RabbitListener(queues = RPCServerRabbitMQConfig.SCORPIOS_RPC_MSG_QUEUE)
    public void process(Message msg) {
        log.info("server 收到msgQueue队列中的消息为 : {}",msg.toString());
        Message response = MessageBuilder.withBody(("我是服务端Server,收到的消息为:"+new String(msg.getBody())).getBytes()).build();
        // 把收到的原消息的CorrelationId取出
        CorrelationData correlationData = new CorrelationData(msg.getMessageProperties().getCorrelationId());
        // 想replyQueue队列发送确认消息
        rabbitTemplate.sendAndReceive(RPCServerRabbitMQConfig.SCORPIOS_RPC_EXCHANGE_NAME, RPCServerRabbitMQConfig.SCORPIOS_RPC_REPLY_QUEUE, response, correlationData);
    }
    
}

解释说明:

3.3 测试

启动ClientServer服务,并在浏览器中输入:http://localhost:8889/send/scorpios

Client服务日志:

在这里插入图片描述

Server服务日志:

在这里插入图片描述

浏览器响应结果:
在这里插入图片描述

4. 小结

再来看一下这个原理图:

在这里插入图片描述

代码地址:https://github.com/Hofanking/springboot-rabbitmq-example

springboot-rabbitmq-rpc-client

springboot-rabbitmq-rpc-server

标签:发送,队列,RabbitMQ,QUEUE,RPC,消息,SCORPIOS,消息中间件
来源: https://blog.csdn.net/zxd1435513775/article/details/122797913