其他分享
首页 > 其他分享> > 【RabbitMQ 笔记】— 方法解读

【RabbitMQ 笔记】— 方法解读

作者:互联网

通过上一篇【RabbitMQ 笔记】— 基本概念,知道生产者和消费者使用到的主要类和接口有 ConnectionFactory、Connection、Channel、Consumer 等。Connection 是用来开启 Channel 的,RabbitMQ 开发工作也基本上是围绕 Connection 和 Channel 这两个类展开。

连接 RabbitMQ

Use of the isOpen() method of channel and connection objects is not recommended for production code, because the value returned by the method is dependent on the existence of the shutdown cause. The following code illustrates the possibility of race conditions:

private volatile ShutdownSignalException shutdownCause = null;

@Override
public boolean isOpen() {
    synchronized(this.monitor) {
        return this.shutdownCause == null;
    }
}

从代码中可以看到,isOpen 方法主要是看 shutdownCause 是不是 null,shutdownCause 变量使用 volatile 修饰,则在可见性上没问题,那么官方所说的竞态条件是什么呢?看下面代码

public void brokenMethod(Channel channel) {
    if (channel.isOpen())
    {
        // The following code depends on the channel being in open state.
        // However there is a possibility of the change in the channel state
        // between isOpen() and basicQos(1) call
        ...
        channel.basicQos(1);
    }
}

首先通过 isOpen 看 Channel是否开启,如果开启则调用相关方法。试想如果 channel 在 isOpen 方法和 basicQos 方法之间关闭了呢,那是不是就有问题。所以官方建议使用者正常情况下直接无视 isOpen 这种检查,简单的认为 Channel 处于开启状态。如果在执行过程中,Channel 关闭了,会抛出 ShutdownSignalException 异常,只用捕获异常就可以了。

交换器和队列

exchangeDeclare

exchangeDeclare 方法定义如下:

Exchange.DeclareOk exchangeDeclare(String exchange, 
                                    String type, 
                                    boolean durable, 
                                    boolean autoDelete,
                                    boolean internal,
                                    Map<String, Object> arguments) throws IOException;

各个参数说明如下:

queueDeclare

queueDeclare 方法定义如下:

Queue.DeclareOk queueDeclare(String queue, 
                            boolean durable, 
                            boolean exclusive, 
                            boolean autoDelete,
                            Map<String, Object> arguments) throws IOException;

各个参数说明如下:

关于排他需要注意三点:

  1. 排他队列是基于连接(Connection)可见的,也就是同一连接的不同信道(Channel)是可以同时访问同一连接创建的排他队列的。
  2. “首次”是指一个连接如果已经声明了一个排他队列,那么其他连接不能再创建同名的排他队列,这个与普通队列不同。
  3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会自动删除。这种队列适合一个客户端同时发送消息和读取消息的场景。

queueBind

交换器和队列绑定的方法 queueBind 定义如下

Queue.BindOk queueBind(String queue, 
                        String exchange, 
                        String routingKey, 
                        Map<String, Object> arguments) throws IOException;

参数说明如下:

还有对应解绑的方法 queueUnbind,参数说明同上

Queue.UnbindOk queueUnbind(String queue, 
                            String exchange, 
                            String routingKey, 
                            Map<String, Object> arguments) throws IOException;

exchangeBind

我们不仅可以将交换器和队列绑定,还可以将交换器和交换器绑定,exchangeBind 定义如下

void exchangeBindNoWait(String destination, 
                        String source, 
                        String routingKey, 
                        Map<String, Object> arguments) throws IOException;

示例图如下image

消息从 source 交换器转发到 destination 交换器,某种程度上说 destination 交换器可以看做是一个队列,示例代码如下,Demo 参考我的 GitHub

String sourceExchangeName = "source_direct_exchange_demo";
String destinationExchangeName = "destination_fanout_exchange_demo";
String queueName = "exchange_queue_demo";
// source 和 destination 的绑定键
String xxBindingKey = "x_x_binding_key_demo";
Channel channel = null;
try {
    // 创建通道
    channel = connection.createChannel();
    // 创建交换器
    channel.exchangeDeclare(sourceExchangeName, "direct", true, false, null);
    channel.exchangeDeclare(destinationExchangeName, "fanout", true, false, null);
    channel.exchangeBind(destinationExchangeName, sourceExchangeName, xxBindingKey);
    // 创建队列
    channel.queueDeclare(queueName, true, false, false, null);
    // destination 交换器和队列绑定
    // 因为 destination 交换器类型声明为 fanout,是忽略绑定键的,所以这里RoutingKey设为""
    channel.queueBind(queueName, destinationExchangeName, "");
    // 发送消息的时候,需要的是 RoutingKey
    String message = "exchange Hello World " + new SimpleDateFormat("mm:ss").format(new Date());
    channel.basicPublish(sourceExchangeName, xxBindingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
} catch(Exception e) {
    throw new RuntimeException("exchange error:" + e);
} finally {
    // 关闭资源
}

发送消息

发送消息有下列的重载方法

void basicPublish(String exchange, 
                    String routingKey, 
                    BasicProperties props, 
                    byte[] body) throws IOException;
                    
void basicPublish(String exchange, 
                    String routingKey, 
                    boolean mandatory, 
                    BasicProperties props, 
                    byte[] body) throws IOException;
                    
void basicPublish(String exchange, 
                    String routingKey, 
                    boolean mandatory, 
                    boolean immediate, 
                    BasicProperties props, 
                    byte[] body) throws IOException;

参数说明如下:

消费消息

RabbitMQ 消费消息模式分为两种:推(Push)模式和拉(Pull)模式。推模式采用 Basic.Consume 进行消费,拉模式采用 Basic.Get 进行消费。

推模式

在推模式中,可以通过持续订阅的方式来获取消息,使用到的相关类有:

com.rabbitmq.client.Consumer;
com.rabbitmq.client.DefaultConsumer;

接受消息一般通过实现 Consumer 接口或者继承 DefaultConsumer 实现。

Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag,
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        System.out.println("recv msg:" + new String(body));
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 手动确认
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
};
// 默认 autoAck = false
channel.basicConsume(EXCHANGE_QUEUE_NAME, false, consumer);

上面代码中设置自动确认为 false,然后在接收到消息后显式的确认,这对消费者是非常有必要的,可以防止消息不必要的丢失。

basicConsume 方法如下

String basicConsume(String queue, // 队列名称
                    boolean autoAck, // 是否自动确认
                    String consumerTag, // 消费者标签,用来区分多个消费者
                    boolean noLocal, // true表示不能将同一Connection中生产者发送的消息传送给这个Connection中的消费者
                    boolean exclusive, // 是否排他
                    Map<String, Object> arguments, // 参数 
                    Consumer callback // 设置消费者回调函数
                    ) throws IOException;

拉模式

拉模式的方法定义如下

GetResponse basicGet(String queue, boolean autoAck) throws IOException;

注意:Basic.Consume 将信道(Channel)设置为接收模式,直到队列取消订阅为止。在接收模式期间,RabbitMQ 会源源不断的向消费者推送消息,当然消息的数量还会受到 Basic.Qos 的限制。如果只是想获取单条消息,建议还是使用 Basic.Get。但是不要把 Basic.Get 放在循环中来代替 Basic.Comsume,这样会严重影响 RabbitMQ 的性能。如果要实现高吞吐量,消费者理应使用 Basic.Consume。

消费端的确认和拒绝

确认

为了保证消息能够可靠地到达消费者,RabbitMQ 提供了消息确认机制。在消费者订阅队列时,可以设置 autoAck 来指定是否自动确认。

拒绝

消费者受到消息后也可以使用 Basic.Reject 命令进行拒绝。

Channel.basicReject 方法定义如下

void basicReject(long deliveryTag, boolean requeue) throws IOException;

当然消费者也可以批量拒绝消息,方法定义如下

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

参数说明如下

参考

内容整理自 《RabbitMQ实战指南》

标签:交换器,String,队列,笔记,解读,boolean,RabbitMQ,channel,消息
来源: https://www.cnblogs.com/tailife/p/16358949.html