其他分享
首页 > 其他分享> > rabbitmq(适合初学者)

rabbitmq(适合初学者)

作者:互联网

rabbitmq

安装rabbitmq

安装版本3.8.9
github 地址:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.8.9

erlang环境推荐下载地址:https://www.erlang-solutions.com/resources/download.html

![image.png](https://www.icode9.com/i/ll/?i=img_convert/e728c346b785f39b1b69ef8aad0b6ccf.png#align=left&display=inline&height=665&margin=[object Object]&name=image.png&originHeight=665&originWidth=725&size=57582&status=done&style=none&width=725)

因为rabbitmq依赖erlang语言版本很强,所以 erlang语言 23版本

#安装erlang和rabbitmq


#erlang语言安装
rpm -ivh esl-erlang_23.2.1-1_centos_7_amd64.rpm


#rabbitmq安装
rpm -ivh rabbitmq-server-3.8.9-1.el6.noarch.rpm 

#因为erlang语言环境可能会依赖一些内部环境,所以需要配置安装

#启动rabbitmq : service rabbitmq-server start

#开启可视化界面 : rabbitmq-plugins enable rabbitmq_management

启动
在web浏览器中输入地址:http://127.0.0.1:15672/
输入默认账号: guest   密码: guest

解决客户端不能登录问题:(User can only log in via localhost) 针对最新版

1.环境变量可用于覆盖配置文件的位置:(官方推荐)

# overrides primary config file location
RABBITMQ_CONFIG_FILE=/path/to/a/custom/location/rabbitmq.conf

# overrides advanced config file location
RABBITMQ_ADVANCED_CONFIG_FILE=/path/to/a/custom/location/advanced.config

# overrides environment variable file location
RABBITMQ_CONF_ENV_FILE=/path/to/a/custom/location/rabbitmq-env.conf

rabbitmq.conf 样式地址:https://github.com/rabbitmq/rabbitmq-server/blob/v3.8.x/deps/rabbit/docs/rabbitmq.conf.example

1.在/etc/rabbitmq/目录下创建文件 rabbitmq.conf
2.编辑
## The default "guest" user is only permitted to access the server
## via a loopback interface (e.g. localhost).
## {loopback_users, [<<"guest">>]},
##
# loopback_users.guest = true

## Uncomment the following line if you want to allow access to the
## guest user from anywhere on the network.
loopback_users.guest = false

#重启即可访问

rabbitmq后台管理界面

1.添加虚拟主机

![image.png](https://www.icode9.com/i/ll/?i=img_convert/d37ef91ce7266662bbaabda8b0ea023a.png#align=left&display=inline&height=656&margin=[object Object]&name=image.png&originHeight=656&originWidth=1849&size=58308&status=done&style=shadow&width=1849)

2.添加账户

![image.png](https://www.icode9.com/i/ll/?i=img_convert/c762a16fab539acc46673268842829bf.png#align=left&display=inline&height=676&margin=[object Object]&name=image.png&originHeight=676&originWidth=1867&size=64716&status=done&style=shadow&width=1867)

账户设置虚拟主机

![image.png](https://www.icode9.com/i/ll/?i=img_convert/60fd75aae11a57699c82512ee9e94387.png#align=left&display=inline&height=762&margin=[object Object]&name=image.png&originHeight=762&originWidth=1878&size=62710&status=done&style=shadow&width=1878)

RabbitMQ模型

1. 直连

在本教程的这一部分中,我们将在 Java 中编写两个程序;发送单个消息的生产者,以及接收消息并打印出消息的使用者。我们将介绍Java API中的一些细节,专注于这个非常简单的事情,只是为了开始。这是一个"Hello World"的消息。
在下面的图中,"P"是我们的生产者,"C"是我们的消费者。中间的框是一个队列 - RabbitMQ 代表使用者保留的消息缓冲区。

![image.png](https://www.icode9.com/i/ll/?i=img_convert/c37347c3f6e702f305c8504f63f59aed.png#align=left&display=inline&height=122&margin=[object Object]&name=image.png&originHeight=122&originWidth=355&size=5392&status=done&style=none&width=355)

(1) 整合spring依赖

<!--rabbitmq-->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.2</version>
</dependency> 

生产者

/**
 * 生产者
 * @throws IOException
 * @throws TimeoutException
 */
@Test
void provider() throws IOException, TimeoutException {

    //创建连接mq的连接工厂对象
    ConnectionFactory connectionFactory = new ConnectionFactory();

    //设置连接rabbitmq主机
    connectionFactory.setHost("192.168.128.144");

    //设置端口号
    connectionFactory.setPort(5672);
    //设置虚拟主机
    connectionFactory.setVirtualHost("/test");

    //设置用户名和密码
    connectionFactory.setUsername("nf");
    connectionFactory.setPassword("123456");

    //获取连接对象
    Connection connection = connectionFactory.newConnection();

    //获取连接中的通道
    Channel channel = connection.createChannel();

    //通道绑定对应的消息队列
    /*
     * 参数1:队列名称, 如果队列不存在就自动创建
     * 参数2:用来定义队列特性是否持久化 true 持久队列,false 不持久队列
     * 参数3:exclusive 是否独占队列 true独占队列 false 不独占队列
     * 参数4:autoDelete 是否在消费完成后自动删除队列,true 自动删除 false 不自动删除
     * 参数5:arguments 附加参数
     * */
    channel.queueDeclare("hello", false, false, false, null);


    //发布消息
    /*
     * 参数1: 交换机名称
     * 参数2: 队列名称
     * 参数3: 传递消息额外设置 MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化
     * 参数4: 消息具体内容
     * */
    channel.basicPublish("", "hello", null, "hello rabbitmq".getBytes());

    channel.close();
    connection.close();
}

消费者

public static void main(String[] args) throws IOException, TimeoutException {
    //创建连接工厂

    ConnectionFactory connectionFactory = new ConnectionFactory();

    connectionFactory.setHost("192.168.128.144");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/test");

    connectionFactory.setUsername("nf");
    connectionFactory.setPassword("123456");

    //创建连接对象
    Connection connection = connectionFactory.newConnection();

    //创建通道
    Channel channel = connection.createChannel();

    //通道绑定对象
    channel.queueDeclare("hello", false, false, false, null);

    //消费消息
    /*
     * 参数1: 消费那个对象的消息
     * 参数2: 开始消息的自动确认机制
     * 参数3: 消费时回调接口
     * */
    channel.basicConsume("hello", true, new DefaultConsumer(channel) {
        /**
         *
         * @param consumerTag
         * @param envelope
         * @param properties
         * @param body 消息队列中取出的消息
         */
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("消息: " + new String(body));
        }
    });
}

2. Work queues工作队列 (task queues)

**
在第一个教程中,我们编写了从命名队列发送和接收消息的程序。在本例中,我们将创建一个工作队列,用于在多个工作人员之间分配耗时的任务。
工作队列(又名:任务队列)的主要思想是避免立即执行资源密集型任务,并且必须等待任务完成。相反,我们把任务安排在以后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行该作业。当您运行多个worker时,任务将在他们之间共享。
这个概念在web应用程序中特别有用,因为在短的HTTP请求窗口期间无法处理复杂的任务。

![image.png](https://www.icode9.com/i/ll/?i=img_convert/f808c8a00a9a5529889ed6feb70c8c3b.png#align=left&display=inline&height=127&margin=[object Object]&name=image.png&originHeight=127&originWidth=385&size=7871&status=done&style=none&width=385)p:生产者     中间:消息队列     C:消费者
生产者

package com.nf.rabbitmq.work;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者
 */
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接rabbitmq主机
        connectionFactory.setHost("192.168.128.144");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置虚拟主机
        connectionFactory.setVirtualHost("/test");
        //设置用户名和密码
        connectionFactory.setUsername("nf");
        connectionFactory.setPassword("123456");
        //获取连接对象
        Connection connection = connectionFactory.newConnection();
        //获取连接中的通道
        Channel channel = connection.createChannel();

        channel.queueDeclare("work", false, false, false, null);

        for (int i = 0; i < 10; i++) {
            channel.basicPublish("", "work", null, ("hello World ------------" + i).getBytes());
        }

        //关闭流
        channel.close();
        //关闭连接
        connection.close();
    }
}

消费者1

package com.nf.rabbitmq.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 第一个消费者
 */
public class Customer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接rabbitmq主机
        connectionFactory.setHost("192.168.128.144");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置虚拟主机
        connectionFactory.setVirtualHost("/test");
        //设置用户名和密码
        connectionFactory.setUsername("nf");
        connectionFactory.setPassword("123456");
        //获取连接对象
        Connection connection = connectionFactory.newConnection();
        //获取连接中的通道
        Channel channel = connection.createChannel();

        channel.queueDeclare("work", false, false, false, null);

        channel.basicConsume("work", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:" + new String(body));

            }
        });


    }
}

消费者2

package com.nf.rabbitmq.work;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 第二个消费者
 */
public class Customer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接rabbitmq主机
        connectionFactory.setHost("192.168.128.144");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置虚拟主机
        connectionFactory.setVirtualHost("/test");
        //设置用户名和密码
        connectionFactory.setUsername("nf");
        connectionFactory.setPassword("123456");
        //获取连接对象
        Connection connection = connectionFactory.newConnection();
        //获取连接中的通道
        Channel channel = connection.createChannel();


        channel.queueDeclare("work", false, false, false, null);


        channel.basicConsume("work", true, new DefaultConsumer(channel) {
            /**
             * No-op implementation of {@link Consumer#handleDelivery}.
             *
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:" + new String(body));
                
            }
        });


    }
}

演示结果:
消费者1:
![image.png](https://www.icode9.com/i/ll/?i=img_convert/18f0f3fb4bdd75d756a5843e7bde48e4.png#align=left&display=inline&height=175&margin=[object Object]&name=image.png&originHeight=175&originWidth=498&size=38730&status=done&style=shadow&width=498)

消费者2:
![image.png](https://www.icode9.com/i/ll/?i=img_convert/34feb57b996309720b379bbf7315b3d3.png#align=left&display=inline&height=183&margin=[object Object]&name=image.png&originHeight=183&originWidth=504&size=39270&status=done&style=shadow&width=504)

注意:消息队列中默认情况是平均分配:即如上图所示,生产者将消息推送到队列中后,消费者1和消费者2平均消费,且消息自动确认为true(消费者自动向消息队列自动确认了消息)

让消息队列“能者多劳”:

![image.png](https://www.icode9.com/i/ll/?i=img_convert/6a096ad6ca22c08be3d977cea8e75655.png#align=left&display=inline&height=345&margin=[object Object]&name=image.png&originHeight=345&originWidth=931&size=90337&status=done&style=shadow&width=931)

![image.png](https://www.icode9.com/i/ll/?i=img_convert/a156de464cd65aa9387ba2f09666b12b.png#align=left&display=inline&height=438&margin=[object Object]&name=image.png&originHeight=438&originWidth=931&size=91595&status=done&style=shadow&width=931)
结果:
![image.png](https://www.icode9.com/i/ll/?i=img_convert/d335f210bfd668e40df53e89f89d6b8e.png#align=left&display=inline&height=345&margin=[object Object]&name=image.png&originHeight=345&originWidth=780&size=85759&status=done&style=shadow&width=780)

![image.png](https://www.icode9.com/i/ll/?i=img_convert/a26180f6cf9e0dd22efbc9d77e7ab30b.png#align=left&display=inline&height=216&margin=[object Object]&name=image.png&originHeight=216&originWidth=845&size=45735&status=done&style=shadow&width=845)

Publish/Subscribe (发布订阅模式、广播)

![image.png](https://www.icode9.com/i/ll/?i=img_convert/a3afa9bb43b28cd5f69cb3958426f374.png#align=left&display=inline&height=128&margin=[object Object]&name=image.png&originHeight=128&originWidth=375&size=6420&status=done&style=shadow&width=375)

在广播模式下,消息发送流程:

生产者把消息发送给交换机 -> 交换机把消息发送给队列

生产者:

package com.nf.rabbitmq.fanout;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产者
 */
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接rabbitmq主机
        connectionFactory.setHost("192.168.128.144");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置虚拟主机
        connectionFactory.setVirtualHost("/test");
        //设置用户名和密码
        connectionFactory.setUsername("nf");
        connectionFactory.setPassword("123456");
        //获取连接对象
        Connection connection = connectionFactory.newConnection();
        //获取连接中的通道
        Channel channel = connection.createChannel();


        /*
         * 参数1: 交换机的名称
         * 参数2: 交换机类型 ,广播类型:fanout
         *
         * */
        channel.exchangeDeclare("log", "fanout");

        /*
        * 参数1:指定的交换机
        * 参数2:
        * 参数3:
        *
        *
        * */
        channel.basicPublish("logs", "", null, "我是生产者 ,我在发消息".getBytes());


        //关闭流
        channel.close();
        //关闭连接
        connection.close();
    }
}

消费者1

package com.nf.rabbitmq.fanout;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者1
 */
public class Customer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接rabbitmq主机
        connectionFactory.setHost("192.168.128.144");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置虚拟主机
        connectionFactory.setVirtualHost("/test");
        //设置用户名和密码
        connectionFactory.setUsername("nf");
        connectionFactory.setPassword("123456");
        //获取连接对象
        Connection connection = connectionFactory.newConnection();
        //获取连接中的通道
        Channel channel = connection.createChannel();

        //通到绑定交换机
        channel.exchangeDeclare("logs", "fanout");

        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, "logs", "");

        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            /**
             * No-op implementation of {@link Consumer#handleDelivery}.
             *
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:" + new String(body));
            }
        });
    }
}

消费者2

package com.nf.rabbitmq.fanout;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者1
 */
public class Customer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接rabbitmq主机
        connectionFactory.setHost("192.168.128.144");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置虚拟主机
        connectionFactory.setVirtualHost("/test");
        //设置用户名和密码
        connectionFactory.setUsername("nf");
        connectionFactory.setPassword("123456");
        //获取连接对象
        Connection connection = connectionFactory.newConnection();
        //获取连接中的通道
        Channel channel = connection.createChannel();

        //通到绑定交换机
        channel.exchangeDeclare("logs", "fanout");

        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, "logs", "");

        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            /**
             * No-op implementation of {@link Consumer#handleDelivery}.
             *
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:" + new String(body));
            }
        });


    }
}

结果
![image.png](https://www.icode9.com/i/ll/?i=img_convert/200807b06021f4ee2bc1c8c114fe1d17.png#align=left&display=inline&height=82&margin=[object Object]&name=image.png&originHeight=82&originWidth=329&size=17026&status=done&style=none&width=329)

![image.png](https://www.icode9.com/i/ll/?i=img_convert/7340b839b9b08bc8863c4546829035e3.png#align=left&display=inline&height=111&margin=[object Object]&name=image.png&originHeight=111&originWidth=349&size=17678&status=done&style=none&width=349)

Routing(路由选择)

![image.png](https://www.icode9.com/i/ll/?i=img_convert/448fd82db13c67a6c8dffd696c0503f5.png#align=left&display=inline&height=165&margin=[object Object]&name=image.png&originHeight=165&originWidth=458&size=12753&status=done&style=none&width=458)

在上一个教程中,我们构建了一个简单的日志记录系统。我们可以向许多接收器广播日志信息。
在本教程中,我们将向它添加一个特性-我们将使它能够只订阅消息的一个子集。例如,我们将只能将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

生产者(产生日志)生产不同的日志,如error,info,debug,warn,fatal,让不同消费者来消费处理这些消息

生产者:

package com.nf.rabbitmq.routing;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者,只消费error
 */
public class Customer1 {

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("/test");
        connectionFactory.setHost("192.168.128.144");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("nf");
        connectionFactory.setPassword("123456");
        Connection connection = connectionFactory.newConnection();
        //创建连接通道
        Channel channel = connection.createChannel();
        //通道声明交换机
        channel.exchangeDeclare("logs_direct", "direct");

        //创建一个临时队列
        String queueName = channel.queueDeclare().getQueue();
        //基于route key绑定队列和交换机
        channel.queueBind(queueName, "logs_direct", "error");

        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            /**
             * No-op implementation of {@link Consumer#handleDelivery}.
             *
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:" + new String(body));
            }
        });

    }
}

消费者1:

package com.nf.rabbitmq.routing;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者,只消费error
 */
public class Customer1 {

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("/test");
        connectionFactory.setHost("192.168.128.144");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("nf");
        connectionFactory.setPassword("123456");
        Connection connection = connectionFactory.newConnection();
        //创建连接通道
        Channel channel = connection.createChannel();
        //通道声明交换机
        channel.exchangeDeclare("logs_direct", "direct");

        //创建一个临时队列
        String queueName = channel.queueDeclare().getQueue();
        //基于route key绑定队列和交换机
        channel.queueBind(queueName, "logs_direct", "error");

        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            /**
             * No-op implementation of {@link Consumer#handleDelivery}.
             *
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:" + new String(body));
            }
        });

    }
}

消费者2

package com.nf.rabbitmq.routing;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者,只消费info error
 */
public class Customer2 {

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("/test");
        connectionFactory.setHost("192.168.128.144");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("nf");
        connectionFactory.setPassword("123456");
        Connection connection = connectionFactory.newConnection();
        //创建连接通道
        Channel channel = connection.createChannel();
        //通道声明交换机
        channel.exchangeDeclare("logs_direct", "direct");

        //创建一个临时队列
        String queueName = channel.queueDeclare().getQueue();
        //基于route key绑定队列和交换机
        channel.queueBind(queueName, "logs_direct", "error");
        channel.queueBind(queueName, "logs_direct", "info");

        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            /**
             * No-op implementation of {@link Consumer#handleDelivery}.
             *
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:" + new String(body));
            }
        });

    }
}

结果 :
消费者1

![image.png](https://www.icode9.com/i/ll/?i=img_convert/3574610c65e5faca5cbccac31a4b819e.png#align=left&display=inline&height=108&margin=[object Object]&name=image.png&originHeight=108&originWidth=495&size=23448&status=done&style=none&width=495)

消费者2:
![image.png](https://www.icode9.com/i/ll/?i=img_convert/3e3d6f1e1821df87375a94eac2b26ba6.png#align=left&display=inline&height=130&margin=[object Object]&name=image.png&originHeight=130&originWidth=488&size=30678&status=done&style=none&width=488)

Topics

topic类型的Exchange与Direct相比,都可以根据RoutingKey把消息路由到不同的队列,只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符!这种模型Routingkey一般都是有一个或者多个单词组成,多个单词之间以“.”分割,例如: item.insert

![image.png](https://www.icode9.com/i/ll/?i=img_convert/f2850d0d4ec7074f868a5b2d296e5694.png#align=left&display=inline&height=166&margin=[object Object]&name=image.png&originHeight=166&originWidth=475&size=13171&status=done&style=none&width=475)

![image.png](https://www.icode9.com/i/ll/?i=img_convert/a0fc1590205649478d5b05b9dc08be45.png#align=left&display=inline&height=36&margin=[object Object]&name=image.png&originHeight=36&originWidth=665&size=11977&status=done&style=none&width=665)

#:占位多个
*:只能占位一个

生产者:

package com.nf.rabbitmq.topics;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.128.144");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("nf");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("/test");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        /*
         * 参数1:交换机名称  参数2:类型
         *
         * */
        channel.exchangeDeclare("topics", "topic");
        /*
        * 参数1:交换机名称
        * 参数2:路由key
        * 参数3:保留
        * 参数4:发送的消息
        *
        * */
        channel.basicPublish("topics", "user.save", null, ("我是生产者,我在生产消息: user.save").getBytes());
        channel.basicPublish("topics", "user.save.delete", null, ("我是生产者,我在生产消息: user.save.delete").getBytes());
        channel.basicPublish("topics", "person.user.save", null, ("我是生产者,我在生产消息: person.user.save").getBytes());
        channel.basicPublish("topics", "user", null, ("我是生产者,我在生产消息: user").getBytes());

        channel.close();
        connection.close();

    }
}

消费者1:

package com.nf.rabbitmq.topics;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者,只消费info error
 */
public class Customer1 {

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("/test");
        connectionFactory.setHost("192.168.128.144");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("nf");
        connectionFactory.setPassword("123456");
        Connection connection = connectionFactory.newConnection();
        //创建连接通道
        Channel channel = connection.createChannel();
        //通道声明交换机
        channel.exchangeDeclare("topics", "topic");

        //创建一个临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定队列和交换机,动态通配符形式route key
        channel.queueBind(queueName, "topics", "user.*");

        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:" + new String(body));
            }
        });

    }
}

消费者2:

package com.nf.rabbitmq.topics;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者,只消费info error
 */
public class Customer2 {

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("/test");
        connectionFactory.setHost("192.168.128.144");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("nf");
        connectionFactory.setPassword("123456");
        Connection connection = connectionFactory.newConnection();
        //创建连接通道
        Channel channel = connection.createChannel();
        //通道声明交换机
        channel.exchangeDeclare("topics", "topic");

        //创建一个临时队列
        String queueName = channel.queueDeclare().getQueue();
        //绑定队列和交换机,动态通配符形式route key
        channel.queueBind(queueName, "topics", "user.#");

        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:" + new String(body));
            }
        });

    }
}

结果:
消费者1
![image.png](https://www.icode9.com/i/ll/?i=img_convert/7fe0f3709a61a40832de9bad92d9ab01.png#align=left&display=inline&height=104&margin=[object Object]&name=image.png&originHeight=104&originWidth=424&size=14975&status=done&style=none&width=424)
 
消费者2
![image.png](https://www.icode9.com/i/ll/?i=img_convert/bf986edaae6e9f6be32a9826978b2e5e.png#align=left&display=inline&height=138&margin=[object Object]&name=image.png&originHeight=138&originWidth=526&size=33873&status=done&style=none&width=526)

rabbit整合springboot

依赖

<!--rabbitmq 依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件

spring:
  rabbitmq:
    host: 192.168.16.132
    port: 5672
    username: nf
    password: 123456
    virtual-host: /test

直连

package com.nf.rabbitmq.hello;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component //默认 持久化 非独占 不是自动删除
@RabbitListener(queuesToDeclare = @Queue(value = "hello",durable = "false",autoDelete = "true"))
public class HelloCustomer {

    @RabbitHandler
    public void test(String message) {
        System.out.println(message);
    }
}

word queues 工作队列

package com.nf.rabbitmq.hello;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkCustomer {

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void test1(String message) {
        System.out.println("消费者1:" + message);
    }

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void test2(String message) {
        System.out.println("消费者2:" + message);
    }
}

Publish/Subscribe (发布订阅模式、广播)**

package com.nf.rabbitmq.hello;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 广播 fanout
 * Publish/Subscribe
 */
@Component
public class FanoutCustomer {

    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue, //创建临时队列
                    exchange = @Exchange( //绑定交换机
                            value = "logs",
                            type = "fanout"
                    ))
    })
    public void test1(String message) {
        System.out.println("消费者1:" + message);
    }


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(
                            value = "logs",
                            type = ExchangeTypes.FANOUT
                    ))
    })
    public void test2(String message) {
        System.out.println("消费者2:" + message);
    }
}

Routing(路由选择)

package com.nf.rabbitmq.hello;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
public class RouteCustomer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "directs", type = ExchangeTypes.DIRECT),//自定义交换名称和类型
                    key = {
                            "info", "error", "warn"
                    }
            )


    })
    public void test1(String message) {
        System.out.println("消费者1:" + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "directs", type = ExchangeTypes.DIRECT),//自定义交换名称和类型
                    key = {
                            "info",
                    }
            )


    })
    public void test2(String message) {
        System.out.println("消费者2:" + message);
    }
}

Topics

**

package com.nf.rabbitmq.hello;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicCustomer {


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(
                            name = "topics",
                            type = ExchangeTypes.TOPIC),
                    key = {"user.save", "user.*"})

    })
    public void test1(String message) {
        System.out.println("消费者1: " + message);
    }


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(
                            name = "topics",
                            type = ExchangeTypes.TOPIC),
                    key = {"user.#"})

    })
    public void test2(String message) {
        System.out.println("消费者2: " + message);
    }
}

测试

package com.nf.rabbitmq;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest(classes = RabbitmqApplication.class)
class RabbitmqApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;


    @Test
    void hello() {
        rabbitTemplate.convertAndSend("hello", "hello world");
    }


    /**
     * 工作队列
     */
    @Test
    void work() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work", "hello world + :" + i);

        }
    }

    /**
     * 广播 fanout
     * Publish/Subscribe
     */
    @Test
    public void fanout() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("logs", "", "fanout");
        }
    }

    /**
     * 路由模式
     */
    @Test
    public void routeDirects() {
        rabbitTemplate.convertAndSend("directs", "info", "发送的 info信息");
        rabbitTemplate.convertAndSend("directs", "error", "发送的 error信息");
    }

    /**
     * 动态路由
     */
    @Test
    public void topicRoute() {
        rabbitTemplate.convertAndSend("topics", "user.save", "发送的 user.save 信息");
        rabbitTemplate.convertAndSend("topics", "user.save.delete", "发送的 user.save.delete 信息");
        rabbitTemplate.convertAndSend("topics", "user.delete", "发送的 user.delete 信息");
    }

}

总结

rabbitmq一些配置问题
rabbitmq除了可以手动创建队列,交换机,还有两种创建方式,一种是配置类的方式,另一种是注解的方式

1.配置类方式

package com.nf.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ配置类
 */
@Configuration
public class RabbitMQConfig {
    //交换机名称
    public static final String ITEM_TOPIC_EXCHANGE = "item_topic_exchange";
    //队列名称
    public static final String ITEM_QUEUE = "item_queue";

    //声明交换机
    @Bean("itemTopicExchange")
    public Exchange topicExchange() {
        return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
    }

    //声明队列
    @Bean("itemQueue")
    public Queue itemQueue() {
        return QueueBuilder.durable(ITEM_QUEUE ).build();
    }

    //绑定队列和交换机 
    @Bean
    public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,
                                     @Qualifier("itemTopicExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("item1.#").noargs();
    }


    
    //声明队列
    @Bean("itemQueue2")
    public Queue itemQueue2() {
        return QueueBuilder.durable(ITEM_QUEUE+ 2).build();
    }

    /*
     * 绑定队列和交换机,"BindingBuilder.bind(queue).to(exchange).with("item2.#").noargs()"含义是将queue绑定到exchange交换机上 ,并且路由的方式为with("item2.#")
     */
    @Bean
    public Binding itemQueueExchange2(@Qualifier("itemQueue") Queue queue,
                                      @Qualifier("itemTopicExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("item2.#").noargs();
    }

}

2.注解方式

package com.nf.rabbitmq.config;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消费者监听类
 */
@Component
public class MyListner {

//    @RabbitListener(queues = "item_queue")
    @RabbitListener(
            bindings =
                    {
                            @QueueBinding(value = @Queue("item_queue"),
                                    exchange = @Exchange(value = "item_topic_exchange", type = ExchangeTypes.TOPIC),key = {"item1.#"})
                            })
    public void msg(String msg) {
        System.out.println("消费者消费消息了:" + msg);
    }


    @RabbitListener(bindings = {
                    @QueueBinding(
                            value = @Queue("item_queue2"),
                            exchange = @Exchange(value = "item_topic_exchange", type = ExchangeTypes.TOPIC),
                            key = {"item2.#"}
                            )}
                    )
    public void msg2(String msg) {
        System.out.println("消费者消费消息了2:" + msg);
    }
}


标签:connectionFactory,png,适合,rabbitmq,初学者,import,public,channel
来源: https://blog.csdn.net/qq_41564402/article/details/113430380