NetCore微服务实现事务一致性masstransit之saga使用
作者:互联网
demo如下,一个订单处理的小例子:
首先看看结果很简单:
核心代码如下:
using MassTransit; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using OrderProcessor.Event; using ServiceModel; using ServiceModel.Command; using ServiceModel.DTO; using ServiceModel.Event; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace OrderProcessor.Service { public class OrderProcessorStateMachine:MassTransitStateMachine<ProcessingOrderState> { private readonly ILogger<OrderProcessorStateMachine> logger; public OrderProcessorStateMachine() { this.logger = GlobalServiceProvider.Instance.CreateScope().ServiceProvider.GetService<ILogger<OrderProcessorStateMachine>>(); this.InstanceState(x => x.State); this.State(() => this.Processing); this.ConfigureCorrelationIds(); this.Initially(this.SetOrderSummitedHandler()); this.During(Processing, this.SetStockReservedHandler(), SetPaymentProcessedHandler(), SetOrderShippedHandler()); SetCompletedWhenFinalized(); } private void ConfigureCorrelationIds() { this.Event(() => this.OrderSubmitted, x => x.CorrelateById(c => c.Message.CorrelationId).SelectId(c => c.Message.CorrelationId)); this.Event(() => this.StockReserved, x => x.CorrelateById(c => c.Message.CorrelationId)); this.Event(() => this.PaymentProcessed, x => x.CorrelateById(c => c.Message.CorrelationId)); this.Event(() => this.OrderShipped, x => x.CorrelateById(c => c.Message.CorrelationId)); } private EventActivityBinder<ProcessingOrderState, IOrderSubmitted> SetOrderSummitedHandler() => When(OrderSubmitted).Then(c => this.UpdateSagaState(c.Instance, c.Data.Order)) .Then(c => this.logger.LogInformation($"Order submitted to {c.Data.CorrelationId} received")) .ThenAsync(c => this.SendCommand<IReserveStock>("rabbitWarehouseQueue", c)) .TransitionTo(Processing); private EventActivityBinder<ProcessingOrderState, IStockReserved> SetStockReservedHandler() => When(StockReserved).Then(c => this.UpdateSagaState(c.Instance, c.Data.Order)) .Then(c => this.logger.LogInformation($"Stock reserved to {c.Data.CorrelationId} received")) .ThenAsync(c => this.SendCommand<IProcessPayment>("rabbitCashierQueue", c)); private EventActivityBinder<ProcessingOrderState, IPaymentProcessed> SetPaymentProcessedHandler() => When(PaymentProcessed).Then(c => this.UpdateSagaState(c.Instance, c.Data.Order)) .Then(c => this.logger.LogInformation($"Payment processed to {c.Data.CorrelationId} received")) .ThenAsync(c => this.SendCommand<IShipOrder>("rabbitDispatcherQueue", c)); private EventActivityBinder<ProcessingOrderState, IOrderShipped> SetOrderShippedHandler() => When(OrderShipped).Then(c => { this.UpdateSagaState(c.Instance, c.Data.Order); c.Instance.Order.Status = Status.Processed; }) .Publish(c => new OrderProcessed(c.Data.CorrelationId, c.Data.Order)) .Finalize(); private void UpdateSagaState(ProcessingOrderState state, Order order) { var currentDate = DateTime.Now; state.Created = currentDate; state.Updated = currentDate; state.Order = order; } private async Task SendCommand<TCommand>(string endpointKey, BehaviorContext<ProcessingOrderState, IMessage> context) where TCommand : class, IMessage { var sendEndpoint = await context.GetSendEndpoint(new Uri("")); await sendEndpoint.Send<TCommand>(new { CorrelationId = context.Data.CorrelationId, Order = context.Data.Order }); } public State Processing { get; private set; } public Event<IOrderSubmitted> OrderSubmitted { get; private set; } public Event<IOrderShipped> OrderShipped { get; set; } public Event<IPaymentProcessed> PaymentProcessed { get; private set; } public Event<IStockReserved> StockReserved { get; private set; } } }
using MassTransit; using MassTransit.MongoDbIntegration.Saga; using OrderProcessor; using OrderProcessor.Service; var builder = WebApplication.CreateBuilder(args); // Add services to the container. builder.Services.AddControllers(); builder.Services.AddMassTransit(x => { x.UsingRabbitMq((context, cfg) => { var connection = "amqp://lx:admin@ip:5672/my_vhost";//不加主机会报错 cfg.Host(connection); cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30))); cfg.UseMessageRetry(r => r.Immediate(5)); cfg.ConfigureEndpoints(context); cfg.ReceiveEndpoint("", ep => { ep.StateMachineSaga(new OrderProcessorStateMachine(), MongoDbSagaRepository<ProcessingOrderState>.Create("connecturl","db")); }); }); }); var app = builder.Build(); app.Run();
这是整个订单的几个步骤。
想把代码都贴出来,过程梳理给大家参考,但是时间有限这个点没那么多了,而且我理应要把这个程序跑起来的。明天照常上班,暂不过多研究。
整个demo代码:
exercise/MassTransitDemo/MassTransitSagasDemo at master · liuzhixin405/exercise (github.com)
有兴趣可以还有一个demo:
exercise/MassTransitDemo/SagaTest-master at master · liuzhixin405/exercise (github.com)
masstransit官网:
MassTransit (masstransit-project.com)
不得不说这个东西真的很不错,不过暂时没找到翻译,大概的过了下文档,还有好多不清楚的,英文水平有限。demo都是来自外国大佬贡献的,很遗憾国内有这方面的文章,但是深入一点的都是国外友人的贡献,而且现成的微服务demo写的很好很多,视情况项目可借鉴。
标签:NetCore,saga,private,Event,masstransit,using,Data,Order,CorrelationId 来源: https://www.cnblogs.com/morec/p/16080026.html