NetCore RabbitMQ 发布订阅模式,消息广播
作者:互联网
十年河东,十年河西,莫欺少年穷
学无止境,精益求精
上篇博客介绍了RabbitMQ的六种工作模式 RabbitMQ的六种工作模式
RabbitMQ的简单模式和Work工作模式请参考:NetCore RabbitMQ 简介及兔子生产者、消费者 【简单模式,work工作模式,竞争消费】
本篇博客使用NetCore完成RabbitMQ发布订阅模式中的广播模式
何为广播模式?
publish/subscribe发布订阅(共享资源)
- X代表交换机rabbitMQ内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
- 相关场景:邮件群发,群聊天,广播(广告)
上图中X代表交换机,RabbitMQ中交换机的类型分为四种,分别为广播模式,定向模式,通配符模式,参数匹配模式
ExchangeType.Fanout【广播模式】
ExchangeType.Direct【定向模式】
ExchangeType.Topic【通配符模式】
ExchangeType.Headers 【参数匹配模式】
广播模式生产者
广播模式创建生产者,分为如下步骤,
1、声明一个交换机
2、声明广播的队列
3、交换机和队列进行绑定
4、生产消息
以上步骤用NetCore 实现如下:
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMqProducer { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; //主机名 factory.UserName = "guest";//使用的用户 factory.Password = "guest";//用户密码 factory.Port = 5672;//端口号 factory.VirtualHost = "/"; //虚拟主机 factory.MaxMessageSize = 1024; //消息最大字节数 using (var connection = factory.CreateConnection()) { //rabbitMQ 基于信道进行通信,因此,我们需要实例化信道Channel using (var channel = connection.CreateModel()) { //exchange 交换机名称 //type 交换机类型 ExchangeType.Direct【定向模式】 ExchangeType.Fanout【广播模式】 ExchangeType.Topic【通配符模式】 ExchangeType.Headers 【参数匹配模式】 //durable 是否持久化 //autoDelete 队列是否为临时队列 //arguments 其他配置 详见博客:https://www.cnblogs.com/chenwolong/p/RabbitMQ_S.html //void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments); //声明一个交换机 string Ename = "ExRabbitMQ"; channel.ExchangeDeclare(Ename, ExchangeType.Fanout, false, false, null); //声明广播的队列 string Qname_1 = "RabbitMQ_Queue_1"; string Qname_2 = "RabbitMQ_Queue_2"; channel.QueueDeclare(Qname_1, false, false, false, null); channel.QueueDeclare(Qname_2, false, false, false, null); //交换机 队列 绑定 //queue 队列名称 //exchange 交换机名称 //routingKey 路由规则 //void QueueBind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments); string routingKey = "EQ"; //路由匹配规则 交换机通过routingKey广播消息到绑定的队列 channel.QueueBind(Qname_1, Ename, routingKey); channel.QueueBind(Qname_2, Ename, routingKey); //发送消息 for(int i = 0; i < 100; i++) { var messages = "I am RabbitMQ"; //传递的消息内容 //exchange 交换机,如果使用默认的交换机,那么routingKey要和队列的名称一致 //routingKey:路由 //basicProperties : 用于基础属性设置 ///BasicPublish(this IModel model, string exchange, string routingKey, IBasicProperties basicProperties, ReadOnlyMemory<byte> body); channel.BasicPublish(Ename, routingKey, null, Encoding.UTF8.GetBytes(messages+"_"+i)); //生产消息 } } } Console.Read(); } } }
这里需要说明的是,当发布订阅模式为广播时,需要定义路由规则routingKey,绑定时,routingKey必须保持一致,RabbitMQ广播是通过routingKey进行队列匹配的。
消费者
广播模式生产者代码中,我们创建了两个队列,因此,我们需要两个消费者分别消费不同队列的消息,如下:
消费者1
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMqConsumer { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; //主机名 factory.UserName = "guest";//使用的用户 factory.Password = "guest";//用户密码 factory.Port = 5672;//端口号 factory.VirtualHost = "/"; //虚拟主机 factory.MaxMessageSize = 1024; //消息最大字节数 //创建连接 var connection = factory.CreateConnection(); //创建通道 var channel = connection.CreateModel(); //事件基本消费者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //接收到消息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"RabbitMQConsumer_1 收到消息: {message}"); //确认该消息已被消费 channel.BasicAck(ea.DeliveryTag, false); Thread.Sleep(100); }; //启动消费者 string Qname = "RabbitMQ_Queue_1"; channel.BasicConsume(Qname, false, consumer); Console.WriteLine("消费者已启动"); Console.ReadKey(); channel.Dispose(); connection.Close(); } } }
消费者2
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; using System.Threading; namespace RabbitMQConsumer_2 { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; //主机名 factory.UserName = "guest";//使用的用户 factory.Password = "guest";//用户密码 factory.Port = 5672;//端口号 factory.VirtualHost = "/"; //虚拟主机 factory.MaxMessageSize = 1024; //消息最大字节数 //创建连接 var connection = factory.CreateConnection(); //创建通道 var channel = connection.CreateModel(); //事件基本消费者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //接收到消息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"RabbitMQConsumer_2 收到消息: {message}"); //确认该消息已被消费 channel.BasicAck(ea.DeliveryTag, false); Thread.Sleep(100); }; //启动消费者 string Qname = "RabbitMQ_Queue_2"; channel.BasicConsume(Qname, false, consumer); Console.WriteLine("消费者已启动"); Console.ReadKey(); channel.Dispose(); connection.Close(); } } }
效果如下:
@陈卧龙的伯乐
标签:订阅,string,NetCore,factory,RabbitMQ,交换机,using,channel 来源: https://www.cnblogs.com/chenwolong/p/RabbitMQ_Fanout.html