其他分享
首页 > 其他分享> > RabbitMQ(1)消息队列中间件使用

RabbitMQ(1)消息队列中间件使用

作者:互联网

RabbitMQ消息队列中间件使用

RabbitMQ 是部署最广泛的开源消息代理。RabbitMQ拥有数以万计的用户,是最受欢迎的开源消息代理之一。从T-MobileRuntastic,RabbitMQ在全球范围内用于小型初创企业和大型企业。

RabbitMQ是轻量级的,易于在本地和云中部署。它支持多种消息传递协议。RabbitMQ 可以部署在分布式和联合配置中,以满足高规模、高可用性的需求。

RabbitMQ的介绍

在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列 - RabbitMQ 代表使用者保留的消息缓冲区。

(P) -> [|||] -> (C)

生产者、使用者和代理不必驻留在同一台主机上。事实上,在大多数应用程序中,它们都没有在一台主机上。因此应用程序也可以既是生产者又是消费者。

RabbitMQ的特点

Docker安装RabbitMQ

一、Docker安装erlang

由于RabbitMQ是erlang语言开发的,所以我们在安装RabbitMQ前先安装erlang,以便运行RabbitMQ,我们本次使用Docker安装因此只需要直接拉取容器即可。通过Docker搜索容器命令docker search --limit 5 erlang搜索到前5条erlang相关容器信息。

root@iZ059o7jp1sn1wZ:~# docker search --limit 5 erlang
NAME                                    DESCRIPTION                                     STARS     OFFICIAL   AUTOMATED
erlang                                  Erlang is a programming language used to bui…   330       [OK]       
erlangsolutions/wombatoam               WombatOAM image without a license key           2                    
circleci/erlang                         CircleCI images for Erlang                      0                    
erlang/ubuntu-build                     Ubuntu based build images                       0                    
erlangsolutions/node-with-build-tools   https://github.com/esl/node-with-build-tools    0   

我直接拉取最新容器:docker pull erlang,如需要拉取特定版本的erlang和RabbitMQ则需要版本对应RabbitMQ Erlang 版本对应要求 — RabbitMQ

验证erlang是否安装完成,在拉取容器后:docker run correl/erlang echo "hello word",如果回馈"hello word"则完成拉取。

root@iZ059o7jp1sn1wZ:~# docker run erlang echo "hello word"
hello word

本次运行erlang就不配置映射文件了,直接运行docker run -it --name 别名 端口号,需要设置配置文件挂载等请看另一篇博客Docker使用相关指令

二、Docker安装RabbitMQ

安装完erlang后开始安装RabbitMQ,docker search --limit 5 rabbitmq查询你需要的版本,这边直接拉取最新镜像docker pull rabbitmq

root@iZ059o7jp1sn1wZ:~# docker search --limit 5 rabbitmq
NAME                        DESCRIPTION                                     STARS     OFFICIAL   AUTOMATED
rabbitmq                    RabbitMQ is an open source multi-protocol me…   4349      [OK]       
bitnami/rabbitmq            Bitnami Docker Image for RabbitMQ               86                   [OK]
bitnami/rabbitmq-exporter                                                   1                    
circleci/rabbitmq-delayed   https://github.com/circleci/rabbitmq-delayed…   1                    
circleci/rabbitmq           This image is for internal use                  0  

挂载安装RabbitMQ:docker run -d --name=rabbitmq -p 5672:5672 -p 15672:15672 -v /docker/rabbitmq/data:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=root rabbitmq

Web可视化端使用此用户密码登录,如果没设置用户账号和密码,默认的账号和密码为guest/guest。使用docker ps查看是否正常运行RabbitMQ和erlang。

注意: guest用户只能本地(localhost、127.0.0.1)访问Web可视化管理界面,其他IP访问需另创用户并授权

root@iZ059o7jp1sn1wZ:~# docker ps
CONTAINER ID   IMAGE                       COMMAND                  CREATED         STATUS         PORTS                                                                                                                                      NAMES
40d6fe910b2e   rabbitmq                    "docker-entrypoint.s…"   4 minutes ago   Up 4 minutes   4369/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 5671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq
6a499e7c356c   0b68d51c5f30                "erl"                    3 hours ago     Up 3 hours                                                                                                                                                erlang

安装Web可视化插件

进入RabbitMQ容器docker exec -it 容器ID /bin/bash,安装可视化插件rabbitmq-plugins enable rabbitmq_management

root@iZ059o7jp1sn1wZ:~# docker exec -it 40d6fe910b2e /bin/bash
root@40d6fe910b2e:/# rabbitmq-plugins enable rabbitmq_management

Enabling plugins on node rabbit@40d6fe910b2e:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_prometheus
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@40d6fe910b2e...
The following plugins have been enabled:
  rabbitmq_management

started 1 plugins.

web进入可视化界面IP:15672,密码为上面设定RabbitMQ的账号密码。

image-20220621113758887

登录成功后的页面

image-20220621113819614

RabbitMQ初の体验(Hello Word)

首先安装相关包,我们可以在RabbitMQ官网找到相关依赖包,RabbitMQ连接需要SLF4J依赖,本次简单的RabbitMQ程序SLF4J Simple已经够用,但你应该在生产中使用一个完整的日志记录库,如Logback

image-20220623153417303

发送

(P) -> [|||]

发布者将连接到 RabbitMQ,发送一条消息,然后退出。

代码部分

package Demo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Producer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        // 1、创建链接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("114.55.34.91");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        // 2、链接
        try (Connection connection = factory.newConnection();
             // 3、获取通道
             Channel channel = connection.createChannel()) {
            /**
             * 1、name:    队列名称
             * 2、durable: 是否持久化
             * 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
             * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
             * 5、arguments:参数
             * */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

arguments参数:

运行结果

Connected to the target VM, address: '127.0.0.1:60528', transport: 'socket'
 [x] Sent 'Hello World!'
Disconnected from the target VM, address: '127.0.0.1:60528', transport: 'socket'

Process finished with exit code 0

用Debug运行到连接的时候,我们可以在可视化页面看见连接的用户IP和用户名字

image-20220623153923360

接收

[|||]-> (C)

这就是我们的出版商。我们的消费者监听来自 RabbitMQ 的消息,因此与发布单个消息的发布者不同,我们将让消费者运行以监听消息并将其打印出来。

我们将使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。

设置与发送者相同,我们都需要打开同一个队列,需要和发布者发布的队列一样。

public class Consumer {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

  }
}

请注意,我们在此处也声明队列。由于我们可能会在发布者之前启动使用者,因此我们希望在尝试使用队列中的消息之前确保队列存在。

我们为什么不使用try-with-resource语句来自动关闭通道和连接?因为我们希望当消费者异步侦听消息到达时,进程保持活动状态。

我们要从服务器队列中拿取消息。由于它将异步推送消息,因此我们以对象的形式提供回调,该回调将缓冲消息,直到我们要使用它们。这就是DeliverCallback子类的作用。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

代码部分

package Demo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 */
public class Consumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("114.55.34.91");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // DeliverCallback缓冲服务器推送给我们的消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

RabbitMQ工作队列

在上面的教程中,我们学会了接受和发送消息,本章节对MQ的接收者消息进行处理。

img

轮询机制

当多个消费者对MQ存储消息进行接收,每个消费者都分配一条,到消息全部被消费。使用任务队列的优点之一是能够轻松并行化工作。如果我们正在积累积压的工作,我们可以添加更多的消费者来进行扩展。

默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮循机制。将上文发送代码改为,输入来进行轮询测试。

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 输入参数
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String message = scanner.next();
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + message + "'");
            }

创建2个线程来进行消费,我们可以直接在idea内创建2个线程来进行处理。

image-20220702135152222

image-20220702135400029

创建好2个消费者线程后,启动消费者线程进行消费监听,然后启动发送者发送消息来处理。

发送者:

image-20220702135528889

消费者1:

image-20220702135555780

消费者2:

image-20220702135635700

结论:消息发送线程,发送消息时工作线程会轮询得到消息发送线程发送的消息,这是一种公平的策略,但是这种方式效率较低,在实际工作中一般采用不公平的策略。

标签:队列,中间件,factory,rabbitmq,RabbitMQ,docker,erlang
来源: https://www.cnblogs.com/HeiDaotu/p/16437652.html