NetCore RabbitMQ Topics 通配符模式
作者:互联网
十年河东,十年河西,莫欺少年穷
学无止境,精益求精
上一节介绍了RabbitMQ定向模式,本篇介绍 Topics 通配符模式
我的系列博客:
NetCore RabbitMQ 简介及兔子生产者、消费者 【简单模式,work工作模式,竞争消费】
kafka、Rabbitmq、EasyNetQ NetCore 源码下载
本篇介绍Topics通配符模式
1.5 topic 主题模式(路由模式的一种)
- 星号井号代表通配符
- 星号1个字符井号代表0个或多个字符
- 路由功能添加模糊匹配
- 消息产生者产生消息,把消息交给交换机
- 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费
Topics通配符模式和定向模式类似,都是通过RoutingKey定向到不到的队列中,不同的是Topics通配符模式只是通配符,有了通配符,就可以做到模糊匹配。
Topics模式中支持两种通配符
* 代表1个字符
# 代表0个或多个字符
例如
路由Key定义为 *.Dog.* ,那么我们发送消息时中间为Dog且前后均为一个字符的路由key都可以匹配到该通配符,例如:C.Dog.D 【注意要带上 . 】
路由Key定义为 #.Dog ,那么发送消息时以Dog结尾的路由Key都可以匹配到该通配符,例如 chenwolong.Dog【注意要带上 . 】
路由Key定义为 Dog.# ,那么发送消息时以Dog开头的路由Key都可以匹配到该通配符 ,例如 Dog.Chenwolong【注意要带上 . 】
路由Key定义为 #.# ,那么 Dog.Cat,Cat.Dog . 等都可以匹配
下面由代码进行说明
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMqProducer { class Program { /// <summary> /// 本示例红色预警为最高预警级别,蓝色预警次之,黄色预警等级最差 /// 红色预警队列中只展示红色预警消息, /// 蓝色预警队列中展示红色预警和蓝色预警 /// 黄色预警队列展示红、蓝、黄三种预警 /// </summary> /// <param name="args"></param> static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; //主机名 factory.UserName = "guest";//使用的用户 factory.Password = "guest";//用户密码 factory.Port = 5672;//端口号 factory.VirtualHost = "/"; //虚拟主机 factory.MaxMessageSize = 1024; //消息最大字节数 using (var connection = factory.CreateConnection()) { //rabbitMQ 基于信道进行通信,因此,我们需要实例化信道Channel using (var channel = connection.CreateModel()) { //exchange 交换机名称 //type 交换机类型 ExchangeType.Direct【定向模式】 ExchangeType.Fanout【广播模式】 ExchangeType.Topic【通配符模式】 ExchangeType.Headers 【参数匹配模式】 //durable 是否持久化 //autoDelete 队列是否为临时队列 //arguments 其他配置 详见博客:https://www.cnblogs.com/chenwolong/p/RabbitMQ_S.html //void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments); //声明一个交换机 类型 : Direct string Ename = "ExRabbitMQ_Topic"; channel.ExchangeDeclare(Ename, ExchangeType.Topic, false, false, null); //声明广播的队列 string Qname_Red = "Queue_Red"; string Qname_Blue = "Queue_Blue"; channel.QueueDeclare(Qname_Red, false, false, false, null); channel.QueueDeclare(Qname_Blue, false, false, false, null); //交换机 队列 绑定 //queue 队列名称 //exchange 交换机名称 //routingKey 路由规则 //void QueueBind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments); string routingKey_Red = "Red.#"; // 红色预警 string routingKey_Blue = "*.Blue"; // 蓝色预警 // channel.QueueBind(Qname_Red, Ename, routingKey_Red); // channel.QueueBind(Qname_Blue, Ename, routingKey_Blue); //发送消息 for(int i = 0; i < 10 ; i++) { var messages_HotRed = "#号匹配0个或多个字符,我是高温红色预警(Red.Hot)"; //传递的消息内容 var messages_FloodRed = "#号匹配0个或多个字符,我是洪涝红色预警(Red.Floot)"; //传递的消息内容 var messages_Blue = "*号匹配一个字符,我是蓝色预警(A.Blue)"; //传递的消息内容 //exchange 交换机,如果使用默认的交换机,那么routingKey要和队列的名称一致 //routingKey:路由 //basicProperties : 用于基础属性设置 ///BasicPublish(this IModel model, string exchange, string routingKey, IBasicProperties basicProperties, ReadOnlyMemory<byte> body); //注意发送消息时,路由Key的匹配规则 channel.BasicPublish(Ename, "Red.Hot", null, Encoding.UTF8.GetBytes(messages_HotRed + "_" + i)); //生产高温红色预警消息 channel.BasicPublish(Ename, "Red.Flood", null, Encoding.UTF8.GetBytes(messages_FloodRed + "_" + i)); //生产洪涝红色预警消息 channel.BasicPublish(Ename, "A.Blue", null, Encoding.UTF8.GetBytes(messages_Blue + "_" + i)); //生产蓝色预警消息 } } } Console.Read(); } } }View Code
蓝色预警消费者端如下:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMqConsumer { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; //主机名 factory.UserName = "guest";//使用的用户 factory.Password = "guest";//用户密码 factory.Port = 5672;//端口号 factory.VirtualHost = "/"; //虚拟主机 factory.MaxMessageSize = 1024; //消息最大字节数 //创建连接 var connection = factory.CreateConnection(); //创建通道 var channel = connection.CreateModel(); //事件基本消费者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //接收到消息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"蓝色预警队列消费者收到消息: {message}"); //确认该消息已被消费 channel.BasicAck(ea.DeliveryTag, false); Thread.Sleep(100); }; //启动消费者 string Qname = "Queue_Blue"; channel.BasicConsume(Qname, false, consumer); Console.WriteLine("消费者已启动"); Console.ReadKey(); channel.Dispose(); connection.Close(); } } }View Code
红色预警消费者端如下:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMqConsumer { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; //主机名 factory.UserName = "guest";//使用的用户 factory.Password = "guest";//用户密码 factory.Port = 5672;//端口号 factory.VirtualHost = "/"; //虚拟主机 factory.MaxMessageSize = 1024; //消息最大字节数 //创建连接 var connection = factory.CreateConnection(); //创建通道 var channel = connection.CreateModel(); //事件基本消费者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //接收到消息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"红色预警队列消费者收到消息: {message}"); //确认该消息已被消费 channel.BasicAck(ea.DeliveryTag, false); Thread.Sleep(100); }; //启动消费者 string Qname = "Queue_Red"; channel.BasicConsume(Qname, false, consumer); Console.WriteLine("消费者已启动"); Console.ReadKey(); channel.Dispose(); connection.Close(); } } }View Code
上一节定向模式中,需要多次绑定,本节通配符模式可以模糊匹配
@天才卧龙的博客
标签:string,NetCore,预警,Topics,通配符,factory,RabbitMQ,using,channel 来源: https://www.cnblogs.com/chenwolong/p/Rabbit_Topic.html