其他分享
首页 > 其他分享> > .Net Core 5.x Api开发笔记 -- 消息队列RabbitMQ实现事件总线EventBus(二)

.Net Core 5.x Api开发笔记 -- 消息队列RabbitMQ实现事件总线EventBus(二)

作者:互联网

上一节说了事件总线

本节在消息队列中实现事件处理:.Net Core 5.x Api开发笔记 -- 消息队列RabbitMQ实现事件总线EventBus(一)

既然是消息队列,就需要有生产者和消费者(订阅)

 1 public interface IMessageQueue
 2 {
 3     /// <summary>
 4     /// 发布消息
 5     /// </summary>
 6     void Publish<T>(T message, string exchangeName = "default", string queueName = "") where T : class;
 7 
 8     /// <summary>
 9     /// 订阅消息
10     /// </summary>
11     void Consume<T>(Func<T, Task<bool>> func, string queueName = "");
12 }

生产者端实现发布消息接口:

 1 public class RabbitMQPublishClient : IMessageQueue
 2 {
 3     private static ConcurrentDictionary<string, string> QueueDic = new ConcurrentDictionary<string, string>();
 4     public RabbitMQPublishClient(){}
 5     
 6     /// <summary>
 7     /// 发布消息
 8     /// </summary>
 9     /// <typeparam name="T"></typeparam>
10     /// <param name="message">消息实体</param>
11     /// <param name="exchangeName">交换机名称(默认default)</param>
12     /// <param name="queueName">队列名称(默认类名)</param>
13     public void Publish<T>(T message, string exchangeName = "default", string queueName = "") where T : class
14     {
15         using (var conn = GetConnection())
16         {
17             using (var channel = conn.CreateModel())
18             {
19                 if (!QueueDic.ContainsKey(queueName) || QueueDic[queueName] != exchangeName)
20                 {
21                     CreateQueue(channel, exchangeName, queueName, true);
22                     QueueDic.TryAdd(queueName, exchangeName);
23                 }
24                 var props = channel.CreateBasicProperties();
25                 props.Persistent = true;  //消息持久化
26                 props.DeliveryMode = 2;   //消息持久化
27                 props.CorrelationId = Guid.NewGuid().ToString();
28 
29                 string content = JsonConvert.SerializeObject(message);
30                 var body = Encoding.UTF8.GetBytes(content);
31                 channel.BasicPublish(exchange: exchangeName, routingKey: queueName, basicProperties: props, body: body);
32             }
33         }
34     }
35 
36     /// <summary>
37     /// 创建队列,绑定到交换机
38     /// </summary>
39     private void CreateQueue(IModel channel, string exchangeName, string queueName, bool isDurable = true)
40     {
41         channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, durable: isDurable, autoDelete: false, arguments: null);
42         channel.QueueDeclare(queueName, durable: isDurable, exclusive: false, autoDelete: false, arguments: null);
43         channel.QueueBind(queueName, exchangeName, routingKey: queueName);
44     }
45 }

消费者(订阅者)实现订阅接口

 1 public class RabbitMQConsumerClient : IMessageQueue
 2 {
 3     public RabbitMQConsumerClient(){}
 4 
 5     /// <summary>
 6     /// 订阅消息
 7     /// </summary>
 8     public void Consume<T>(Func<T, Task<bool>> func, string queueName = "")
 9     {
10         string exchangeName = exchangeName.Equals("default") ? $"{customKey}.default" : $"{customKey}.{queueName}";
11 
12         var collection = GetConnection();
13         var channel = collection.CreateModel();
14         channel.ExchangeDeclare(exchangeName, "direct", durable: true, autoDelete: false, arguments: null);
15         channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
16         channel.QueueBind(queueName, exchangeName, routingKey: queueName, arguments: null);
17 
18         //消费事件
19         var consumer = new EventingBasicConsumer(channel);
20         consumer.Received += async (sender, e) =>
21         {
22             try
23             {
24                 string content = Encoding.UTF8.GetString(e.Body.ToArray());
25                 T message = JsonConvert.DeserializeObject<T>(content);
26                 bool isSuccess = await func.Invoke(message);    //执行func委托  这里才是真正执行传入的事件处理程序
27                 if (isSuccess)
28                 {
29                     channel.BasicAck(e.DeliveryTag, false);     //手动确认消息消费成功
30                 }
31                 else
32                 {
33                    channel.BasicReject(e.DeliveryTag, true);    //手动打回队列,下次重新消费
34                 }
35             }
36             catch (Exception ex)
37             {
38                 channel.BasicAck(e.DeliveryTag, false);
39             }
40         };
41 
42         channel.BasicQos(0, 1, false);  //限制同时接收消息数量为1,true是将限制应用于channel级,false是将限制应用于consumer级
43         channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);   //订阅消息   autoAck: false 表示手动确认消息
44     }
45 }

上边都是平时很常用的功能逻辑,接下来是才是重点!!!

1,事件总线有了

2,生产者订阅者也有了

接下来还需要做至少3件事:

1,创建事件处理程序,就是真正干活的

2,注册消费者、注册事件处理程序、绑定事件源和事件处理程序(在订阅端)

3,订阅端启动消费者订阅监控,并触发事件处理程序

4,发布消息测试

-----------------------------------------------------------------

1,创建事件处理程序

 1 /// <summary>
 2 /// 事件处理程序,真正干活的是我
 3 /// 继承IEventHandler<T>接口,其中T必须用EventWithData<TData>类型,因为EventWithData<TData>继承了EventBase类
 4 /// </summary>
 5 public class Person_EventHandler : IEventHandler<EventWithData<Person>>
 6 {
 7     //最终干活的
 8     public async Task HandleEvent(EventWithData<Person> eventWithData)
 9     {
10         var data = eventWithData.Data;
11         if (data == null)
12         {
13             Console.WriteLine("么有数据呀!");
14         }
15         else
16         {
17             try
18             {
19                 Console.WriteLine($"{DateTime.Now}-------" + JsonConvert.SerializeObject(data));
20             }
21             catch (Exception ex)
22             {
23                 Console.WriteLine("异常:" + ex.Message);
24             }
25         }
26     }
27 }

2,注册消费者、注册事件处理程序、绑定事件源和事件处理程序(在订阅端),这一步在 Startup 中实现!!!

 1 public IServiceCollection ServiceCollection { get; private set; }
 3 public void ConfigureServices(IServiceCollection services)
 4 {
 5     ServiceCollection = services;    //将services传递给ServiceCollection
 6     services.AddControllersWithViews();
 7 
 8     //注册消费者
 9     services.AddSingleton<IMessageQueue, RabbitMQConsumerClient>();
10     //注册事件处理程序,绑定事件源:IEventHandler<EventWithData<Person>>和事件处理程序:Person_EventHandler的关系
11     services.AddSingleton<Person_EventHandler>();
12 }
13 
14 public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
15 {
16     //绑定事件源和事件处理程序的映射关系  
17     //这里跟上边的绑定不是一回事,这里仅仅就是绑定了一个映射关系,等到后边事件触发的时候需要用到该处映射关系
18     EventBus.Default.RegisterHandler<EventWithData<Person>, Person_EventHandler>();
19 
20     //其它中间件。。
21     。。。
22 
23     //启动消费者订阅监控入口
24     app.Consumer(app.ApplicationServices, ServiceCollection);
25 }

3,启动消费者订阅监控,并触发事件处理程序

 1 /// <summary>
 2 /// 启动消费者订阅监控入口
 3 /// </summary>
 4 public static void Consumer(this IApplicationBuilder app, IServiceProvider provider, IServiceCollection services)
 5 {
 6     //获取实例对象 
 7     IMessageQueue queue = provider.GetService<IMessageQueue>();   
 8 
 9     //异步调用,这里接收的消息类型T为:EventWithData<Person>  也可以是其它自定义消息类型,没有限制
10     queue.Consume<EventWithData<Person>>(async message =>
11     {
12         using (var currentContainer = services.BuildServiceProvider().CreateScope())   //使用系统内置服务容器
13         {
14             IocManager.Configure(currentContainer.ServiceProvider);                //将容器服务传递过去
15             
16             var e = EventWithData<Person>.New(message.Data);      //这里要传递的消息类型必须使用 EventWithData<TData>类型初始化消息
17             
18             await EventBus.Default.TriggerAsync(e);               //调用触发事件逻辑(需要提前绑定映射关系和注册容器)
19             
20             return true;
21         }
22     });
23 }

说明:

IocManager.Configure(currentContainer.ServiceProvider)  实际上就是声明了一个全局的 IServiceProvider 变量,然后将当前的ServiceProvider赋值给全局变量

await EventBus.Default.TriggerAsync(e)   执行的就是上一节事件总线中的 public async Task TriggerAsync<TEvent>(TEvent e)  方法

到这里 事件源和事件处理程序就一一对应上了。

接下来开始测试一下是否可用,先在发布的应用程序Startup中注册一下消息队列容器

1 services.AddSingleton<IMessageQueue, RabbitMQPublishClient>();

然后发布一条消息,注意:这里消息的类型EventWithData<Person>没有限制的,你可以不使用EventWithData<Person>类型的消息也行

 1 //发布消息
 2 var message = EventWithData<Person>.New(
 3     new Person()
 4     {
 5         UserId = 1,
 6         UserName = "张三"
 7     }
 8 );
 9 
10 messageQueue.Publish(message);

最后测试,订阅者成功消费了刚刚生产的消息

标签:Core,false,RabbitMQ,queueName,事件,exchangeName,EventBus,public,channel
来源: https://www.cnblogs.com/peterzhang123/p/15357497.html