其他分享
首页 > 其他分享> > RabbitMq- Publish/Subscribe订阅模式(三)

RabbitMq- Publish/Subscribe订阅模式(三)

作者:互联网

一、订阅模式类型:

1.1 订阅模式示例图:

重点:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

1.2 订阅模式小结

订阅模式与前面的两种模式比较:多了一个角色Exchange交换机,接收生产者发送的消息并决定如何投递消息到其绑定的队列;消息的投递决定于交换机的类型。

交换机类型:广播(fanout)、定向(direct)、通配符(topic)

交换机只做消息转发,自身不存储数据。

二、 Publish/Subscribe发布与订阅模式

2.1 发布订阅模式:

1)每个消费者监听自己的队列。

2)生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收 到消息

3)发布与订阅模式特点:一个消息可以被多个消费者接收;其实是使用了订阅模式,交换机类型为:fanout广播

2.2 需求:

编写生产者、消费者代码并测试了解Publish/Subscribe发布与订阅模式的特点

步骤:

代码示例-生产者:

package publishsubscribe;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.study.rabbitmq.util.ConnectionUtil;

import java.io.IOException;

//发布与订阅模式 发送消息

public class Producer {
    //交换机名称
    static final String FANOUT_EXCHANGE = "fanout_exchange";
    //队列名称
    static final String FANOUT_QUEUE_1 = "fanout_queue_1";
    static final String FANOUT_QUEUE_2 = "fanout_queue_2";

    static final String QUEUE_NAME = "publish_queue";

    public static void main(String[] args) throws Exception  {
        // 1. 创建连接
        Connection connection = ConnectionUtil.getConnection();
        // 2. 创建频道;
        Channel channel = connection.createChannel();
        // 3.声明交换机,参数1:交换机名称,参数2:交换机类型(FANOUT广播类型、direct 定向路由,topic 通配符)
        channel.exchangeDeclare(FANOUT_EXCHANGE , BuiltinExchangeType.FANOUT);
//   4. 声明队列;
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
         * 参数3:是否独占本连接
         * 参数4:是否在不使用的时候队列自动删除
         * 参数5:其它参数
         */
        channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
        channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);

        //5 队列绑定到交换机(参数1:队列名称  参数2:交换机  参数3:路由key)
        channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,"");
        channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,"");

            //6. 发送消息;
        for(int i=1;i<=10;i++){
                String message = "你好! publishSubscribe发布与订阅模式---"+i;
                /**
                 * 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机)
                 * 参数2:路由key,简单模式中可以使用队列名称
                 * 参数3:消息其它属性
                 * 参数4:消息内容
                 */
                channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes());
                System.out.println("已发送消息:" + message);

            }
            //6. 关闭资源
            channel.close();
            connection.close();
            }

}

代码示例-消费者:

package publishsubscribe;
import com.rabbitmq.client.*;
import com.study.rabbitmq.util.ConnectionUtil;

import java.io.IOException;


/**
 * 发布与订阅模式;消费者接收消息
 */
public class Consumer2 {
    public static void main(String[] args) throws Exception {
//        1. 创建连接;(抽取一个获取连接的工具类)
        Connection connection = ConnectionUtil.getConnection();
//        2. 创建频道;
        Channel channel = connection.createChannel();
        //3 声明交换机
        channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);
//        4. 声明队列;
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
         * 参数3:是否独占本连接
         * 参数4:是否在不使用的时候队列自动删除
         * 参数5:其它参数
         */
        channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);
        //5 队列绑定到交换机上
        channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHANGE,"");
//        6. 创建消费者(接收消息并处理消息);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key为:" + envelope.getRoutingKey());
                //交换机
                System.out.println("交换机为:" + envelope.getExchange());
                //消息id
                System.out.println("消息id为:" + envelope.getDeliveryTag());
                //接收到的消息
                System.out.println("消费者2---接收到的消息为:" + new String(body, "utf-8"));
            }
        };
//        6. 监听队列   (需要持续监听队列消息,所以不要关闭资源)
        /**
         * 参数1:队列名
         * 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除;
         * 如果设置为false则需要手动确认
         * 参数3:消费者
         */
        channel.basicConsume(Producer.FANOUT_QUEUE_2,true,defaultConsumer);
        //不关闭资源,应该一直监听消息
        // channel.close();
        // connection.close();


    }

}

三、发布与订阅模式小结:

发布与订阅模式:一个消息可以被多个消费者接收;一个消费者对于的队列,该队列只能被一个消费者监听。使用了订阅模式中交换机类型为:广播。

标签:队列,RabbitMq,Subscribe,FANOUT,参数,Publish,交换机,channel,消息
来源: https://www.cnblogs.com/nanao/p/15422347.html