其他分享
首页 > 其他分享> > 春季-如何同时使用Rabbit连接缓存模式和自动声明功能?

春季-如何同时使用Rabbit连接缓存模式和自动声明功能?

作者:互联网

Spring AMQP Reference说:

Starting with version 1.3, the CachingConnectionFactory can be configured to cache connections as well as just channels. In this case, each call to createConnection() creates a new connection (or retrieves an idle one from the cache). Closing a connection returns it to the cache (if the cache size has not been reached). Channels created on such connections are cached too. The use of separate connections might be useful in some environments, such as consuming from an HA cluster, in conjunction with a load balancer, to connect to different cluster members. Set the cacheMode to CacheMode.CONNECTION.

我在负载均衡器后面有一个HA群集,并且可以清楚地看到,当我使用CacheMode.CHANEL时,我所有的通道都来自单个连接,并且它们都位于同一主机中.

当我更改为CacheMode.CONNECTION时,确实看到了通道中的差异,即,并非所有通道都位于同一连接中,因此,我看到正在使用的群集中的主机不同.

因此,我认为在这种特殊情况下,第二种配置对我来说很有意义.但是,文档还说:

When the cache mode is CONNECTION, automatic declaration of queues etc. is NOT supported.

显然,这也是一个重要功能,因为我不想手动对配置进行任何更改.

所以,我的问题是,有没有办法我可以同时使用这两件事?拥有两个连接工厂,一个用于RabbitAdmin,另一个用于RabbitTemplate,侦听器等是否有意义?

解决方法:

问题在于我们不知道在哪个连接上声明元素.我们最终将为创建的每个新连接重新声明所有内容.

如果可以的话,您可以将自己的连接侦听器添加到连接工厂,然后在RabbitAdmin bean上调用rabbitAdmin.initialize()(如果缓存模式为通道,则这是Admin内部的默认侦听器).

或者,您可以继承RabbitAdmin的子类并覆盖afterPropertiesSet().

我想您可以添加更多逻辑以确定您是否确实需要执行声明-请参见RabbitAdmin.afterPropertiesSet()…

/**
 * If {@link #setAutoStartup(boolean) autoStartup} is set to true, registers a callback on the
 * {@link ConnectionFactory} to declare all exchanges and queues in the enclosing application context. If the
 * callback fails then it may cause other clients of the connection factory to fail, but since only exchanges,
 * queues and bindings are declared failure is not expected.
 *
 * @see InitializingBean#afterPropertiesSet()
 * @see #initialize()
 */
@Override
public void afterPropertiesSet() {

    synchronized (this.lifecycleMonitor) {

        if (this.running || !this.autoStartup) {
            return;
        }

        if (this.connectionFactory instanceof CachingConnectionFactory &&
                ((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {
            this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");
            return;
        }

        // Prevent stack overflow...
        final AtomicBoolean initializing = new AtomicBoolean(false);

        this.connectionFactory.addConnectionListener(connection -> {

            if (!initializing.compareAndSet(false, true)) {
                // If we are already initializing, we don't need to do it again...
                return;
            }
            try {
                /*
                 * ...but it is possible for this to happen twice in the same ConnectionFactory (if more than
                 * one concurrent Connection is allowed). It's idempotent, so no big deal (a bit of network
                 * chatter). In fact it might even be a good thing: exclusive queues only make sense if they are
                 * declared for every connection. If anyone has a problem with it: use auto-startup="false".
                 */
                initialize();
            }
            finally {
                initializing.compareAndSet(true, false);
            }

        });

        this.running = true;

    }
}

我们可以将其设为管理员选项-随时设置contribution.

标签:spring-amqp,spring-rabbit,spring-rabbitmq,spring
来源: https://codeday.me/bug/20191111/2021365.html