其他分享
首页 > 其他分享> > RabbitMQ的使用(二):RabbitMQ中的五种通信模型

RabbitMQ的使用(二):RabbitMQ中的五种通信模型

作者:互联网

文章目录

5.1、helloworld模型

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-d0LabZFw-1584019131982)(pic\1583822258266.png)]

意思是:生产者将消息发送到队列 然后队列将这个消息发送给消费者

5.1.1、导包
  <!--导入RabbitMQ的相关的包-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.5.0</version>
        </dependency>
5.1.2、生产者的写法
 //申明队列的名字
    private static final String QUEUE_NAME="nz1904-helloword";

    public static void main(String[] args) throws IOException, TimeoutException {

        //第一步:获取连接
        Connection connection = ConnectionUtils.getConnection();
        //第二步:创建数据传输的通道
        Channel channel = connection.createChannel();
        //第三步:申明队列
        /**
         * 第一个参数:队列的名字
         * 第二个参数:是否持久化   比如现在发送到队列里面的消息 如果没有持久化 重启这个队列后数据会丢失(false) true:重启之后数据依然在
         * 第三个参数:是否排外
         *            1:连接关闭之后 这个队列是否自动删除
         *            2:是否允许其他通道来进行访问这个数据
         * 第四个参数:是否允许自动删除
         *            就是当最后一个连接断开的时候  这个时候是否允许自动删除这个队列
         * 第五个参数:申明队列的时候 要附带的一些参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发送数据到队列

        /**
         * 第一个参数:exchange  交换机  没有就设置为"值就可以了"
         * 第二个参数:原本的意思是路由的key  现在没有key直接使用队列的名字
         * 第三个参数:发送数据到队列的时候 是否要带一些参数  没有带任何参数
         * 第四个参数:向队列中发送的数据
         */
        channel.basicPublish("",QUEUE_NAME,null,"我是小波波1111".getBytes());
        channel.close();
        connection.close();
    }
5.1.3、消费者的写法
 //申明队列的名字
    private static final String QUEUE_NAME="nz1904-helloword";

    public static void main(String[] args) throws IOException, TimeoutException {

        //获取连接
        Connection connection = ConnectionUtils.getConnection();

        //创建通道
        final Channel channel = connection.createChannel();

        //申明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);


        //消费者的申明
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            /**
             *
             * @param consumerTag:这个消息的唯一标记
             * @param envelope:信封(请求的消息属性的一个封装)
             * @param properties:前面队列带过来的值
             * @param body  :接受到的消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受到的消息是:"+new String(body));
                //进行手动应答
                /**
                 * 第一个参数:自动应答的这个消息标记
                 * 第二个参数:false 就相当于告诉队列受到消息了
                 */
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //绑定这个消费者
        /**
         * 第一个参数:队列的名字
         * 第二个参数:是否自动应答
         * 第三个参数:消费者的申明
         */
        channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
}

5.2、work模型的玩法

多个消费者消费的数据之和才是原来队列中的所有数据 适用于流量的消峰

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oba16bOM-1584019131983)(pic\1583827843958.png)]

5.2.1、生产者的编写
public class Producer {

    private static final String QUEUE_NAME="nz1904-work";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //下面向队列中发送100条消息
        for (int i = 0; i <100 ; i++) {
            channel.basicPublish("",QUEUE_NAME,null,("我是工作模型:"+i).getBytes());
        }
        channel.close();
        connection.close();
    }

}

5.2.2、消费者1的编写
public class Consumer {

    //申明队列的名字
    private static final String QUEUE_NAME="nz1904-work";

    public static void main(String[] args) throws IOException, TimeoutException {

        //获取连接
        Connection connection = ConnectionUtils.getConnection();

        //创建通道
        final Channel channel = connection.createChannel();

        //申明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);


        //消费者的申明
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            /**
             *
             * @param consumerTag:这个消息的唯一标记
             * @param envelope:信封(请求的消息属性的一个封装)
             * @param properties:前面队列带过来的值
             * @param body  :接受到的消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1接受到的消息是:"+new String(body));
                //进行手动应答
                /**
                 * 第一个参数:自动应答的这个消息标记
                 * 第二个参数:false 就相当于告诉队列受到消息了
                 */
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //绑定这个消费者
        /**
         * 第一个参数:队列的名字
         * 第二个参数:是否自动应答
         * 第三个参数:消费者的申明
         */
        channel.basicConsume(QUEUE_NAME,false,defaultConsumer);

    }
}
5.2.3、消费者2的编写
public class Consumer1 {

    //申明队列的名字
    private static final String QUEUE_NAME="nz1904-work";

    public static void main(String[] args) throws IOException, TimeoutException {

        //获取连接
        Connection connection = ConnectionUtils.getConnection();

        //创建通道
        final Channel channel = connection.createChannel();

        //申明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);


        //消费者的申明
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            /**
             *
             * @param consumerTag:这个消息的唯一标记
             * @param envelope:信封(请求的消息属性的一个封装)
             * @param properties:前面队列带过来的值
             * @param body  :接受到的消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2接受到的消息是:"+new String(body));
                //进行手动应答
                /**
                 * 第一个参数:自动应答的这个消息标记
                 * 第二个参数:false 就相当于告诉队列受到消息了
                 */
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //绑定这个消费者
        /**
         * 第一个参数:队列的名字
         * 第二个参数:是否自动应答
         * 第三个参数:消费者的申明
         */
        channel.basicConsume(QUEUE_NAME,false,defaultConsumer);

    }
}

5.3、发布订阅模型

在这里插入图片描述

简单的说就是队列里面的消息会被几个消费者 同时接受到

模型 适合于做模块之间的异步通信

例子: 就可以使用这种模型 来发送日志信息 ------ 立马就会被log收集程序

收集到 直接写到咋们的文件里面

例子:springcloud的config组件里面通知配置自动更新

例子:缓存同步也可以使用这一个

例子:高并发下实现下单逻辑

5.3.1、编写生产者
public class Producer {

    //申明交换机的名字
    private static final String EXCHANGE_NAME="nz1904-fanout-01";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明交换机
        /**
         * 第一个参数:交换机的名字
         * 第二个参数:交换机的类型
         *    交换机的类型是不能乱写的   如果使用的是发布订阅模型  只能写 fanout
         */
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //发送消息到交换机了
        for (int i = 0; i <100 ; i++) {
            channel.basicPublish(EXCHANGE_NAME,"",null,("发布订阅模型的值:"+i).getBytes());
        }
        //关闭资源
        channel.close();
        connection.close();
    }

}
5.3.2、编写消费者
public class Consumer {
    //申明交换机的名字
    private static final String EXCHANGE_NAME="nz1904-fanout-01";
    //队列的名字
    private static final String QUEUE_NAME="nz1904-fanout-queue1";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //申明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //将队列绑定到交换机
        /**
         * 第一个参数:队列的名字
         * 第二个参数:交换机的名字
         * 第三个参数:路由的key(现在没有用到这个路由的key)
         */
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

        //申明消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("队列1接受到的数据是:"+new String(body));
            }
        };

        //就进行消费者的绑定
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}
5.3.2、编写消费者2
public class Consumer1 {

    //申明交换机的名字
    private static final String EXCHANGE_NAME="nz1904-fanout-01";
    //队列的名字
    private static final String QUEUE_NAME="nz1904-fanout-queue2";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //申明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //将队列绑定到交换机
        /**
         * 第一个参数:队列的名字
         * 第二个参数:交换机的名字
         * 第三个参数:路由的key(现在没有用到这个路由的key)
         */
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

        //申明消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("队列2接受到的数据是:"+new String(body));
            }
        };

        //就进行消费者的绑定
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

5.4、路由模型

路由模式相当于是发布订阅模式的升级版

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zFRBJFPK-1584019131984)(pic\1583891119250.png)]

5.4.1、生产者的写法
public class Producer {
    private static final String EXCHANGE_NAME="nz1904-exchange-direct-01";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        /**
         * 如果玩的是路由模型 交换机的类型只能是  direct
         */
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //发送信息到交换机
        for (int i = 0; i <100 ; i++) {
            if(i%2==0){
                //这个路由的key是可以随便设置的
                channel.basicPublish(EXCHANGE_NAME,"xiaowangzi",null,("路由模型的值:"+i).getBytes());
            }else{
                //这个路由的key是可以随便设置的
                channel.basicPublish(EXCHANGE_NAME,"xiaobobo",null,("路由模型的值:"+i).getBytes());
            }
        }
        channel.close();
        connection.close();
    }
}
5.4.2、消费者1的写法
public class Cosnumer1 {

    private static final String QUEUE_NAME="nz1904-direct-queue-01";
    private static final String EXCHANGE_NAME="nz1904-exchange-direct-01";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //申明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //绑定队列到交换机
        //第三个参数:表示的是路由key
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"xiaobobo");
        //申明消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               //这里就是接受消息的地方
                System.out.println("路由key是xiaobobo的这个队列接受到数据:"+new String(body));
            }
        };
        //绑定消费者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}
5.4.3、消费者2的写法
public class Consumer2 {
    private static final String QUEUE_NAME="nz1904-direct-queue-02";
    private static final String EXCHANGE_NAME="nz1904-exchange-direct-01";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明队列
      channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //申明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //绑定队列到交换机
        //第三个参数:表示的是路由key
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"xiaowangzi");
        //申明消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //这里就是接受消息的地方
                System.out.println("路由key是xiaowangzi的这个队列接受到数据:"+new String(body));
            }
        };
        //绑定消费者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

5.5、topic模式

说明:topic模式相当于是对 路由模式的一个升级 topic模式主要就是在匹配的规则上可以实现模糊匹配

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-i4qeGTH7-1584019131985)(pic\1583892955712.png)]

5.5.1、生产者的写法
public class Producer {
    private static final String EXCHANGE_NAME = "nz1904-exchange-topic-01";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        /**
         * 如果玩的是路由模型 交换机的类型只能是  direct
         */
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //发送信息到交换机
        for (int i = 0; i < 100; i++) {
            //这个路由的key是可以随便设置的
            //topic在路由基础上只有 路由的key发生改变  其余的都不变
            channel.basicPublish(EXCHANGE_NAME, "xiaowangzi.xiaowangzi.xiaowangzi", null, ("路由模型的值:" + i).getBytes());
        }
        channel.close();
        connection.close();
    }
}
5.5.3、消费者1的写法
public class Cosnumer1 {

    private static final String QUEUE_NAME="nz1904-topic-queue-01";
    private static final String EXCHANGE_NAME="nz1904-exchange-topic-01";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //申明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //绑定队列到交换机
        //第三个参数:表示的是路由key
        /**
         *    注意  * :只是代表一个单词
         *         # :这个才代表  一个或者多个单词
         *         记住如果有多个单词组成的路由key  那么多个单词之间使用 . 好连接
         *
         *
         */
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"xiaobobo.*");
        //申明消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               //这里就是接受消息的地方
                System.out.println("路由key是xiaobobo的这个队列接受到数据:"+new String(body));
            }
        };
        //绑定消费者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

5.5.3、消费者2的写法
public class Consumer2 {
    private static final String QUEUE_NAME="nz1904-topic-queue-02";
    private static final String EXCHANGE_NAME="nz1904-exchange-topic-01";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //申明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //申明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //绑定队列到交换机
        //第三个参数:表示的是路由key
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"xiaowangzi.#");
        //申明消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //这里就是接受消息的地方
                System.out.println("路由key是xiaowangzi的这个队列接受到数据:"+new String(body));
            }
        };
        //绑定消费者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

备注:使用了交换机发送了数据 如果没有消费者的话那么这个数据会发生丢失 通过设置这样的属性来解决这个问题

channel.basicPublish(EXCHANGE_NAME, "xiaowangzi.xiaowangzi.xiaowangzi", MessageProperties.PERSISTENT_TEXT_PLAIN, ("路由模型的值:" + i).getBytes());

备注2:通道的问题

原本没有通道我们也可以完成这个请求 RabbitMQ官方考虑到一个问题生产者 和 消费者 实际上 Connection 引入这个通道这个概念 是为了降低TCP连接的这样一个消耗 相当于是为了 TCP的复用 还有一个目的 就是为了线程隐私 相当于每一个线程都给你创建了一个通道

希望大家关注我一波,防止以后迷路,有需要的可以加我Q讨论互相学习java ,学习路线探讨,经验分享与java Q:2415773436

标签:false,String,队列,通信模型,RabbitMQ,QUEUE,五种,channel,NAME
来源: https://blog.csdn.net/yuemmm/article/details/104828298