事件协调器(saga)
作者:互联网
一、整体流程
首先我们通过一个讲的比较烂的业务模型(库存-余额-订单)来简述一下saga是如何实现分布式事务的。然后再讲解一下saga实现这套流程都做了哪些工作来让大家对分布式事务有一个清晰的认知。首先来讲一下为什么要实现一个分布式事务。当我们的应用通过业务拆解后以物理隔离的模式运行在不同的物理机/虚拟机/容器上并且我们的数据库也做了相应的隔离后,我们没有办法通过发布一个单一的原子的ACID事务来达到事务的一致性。单个数据库在运行事务时通过对应的机制(诸如主流的MVCC多版本并发控制)来确保ACID。但是在跨多个数据库的情况下,各家数据库厂商就只能通过二阶段提交的XA协议来实现分布式事务。这种方案确实在一定程度上降低了分布式事务的复杂性不过性能上却无法做到很好的平衡。而更加常见的办法是通过应用自身调用各自的数据库原子事务以链条的形式来完成分布式事务,而saga作为其中的一种方案已经在各大企业的业务场景中被广泛的实践过了。
下面我们就看看一个比较典型电商下单的场景,当电商注册用户在电商平台完成内用金充值后可以直接在平台上下单购买商品。下单的流程如下:点击购物车下订单->预扣库存->预扣内用金->创建订单,流程如下:
在理想情况下当我们下订单顺利时,每一个服务只需要包裹各自的事务(扣库存/扣余额/创建订单),通过调用链即可完成一个完整的订单创建业务。然而由于数据库存在物理隔离,很多时候我们需要面对这种情况:
这个时候saga的作用就体现出来了,它可以通过事件订阅/发布的形式帮你完成1-8的自动化的调用或者补偿调用,你需要做的只是一个配置和把对应的订阅/补偿方法给注册到这个配置的主题上即可。接下来我们就讲讲saga是如何一步一步来实现这套逻辑的。
二、配置
*下面的所有实现均以Dapr实现集成为例,即(Oxygen-Saga.PubSub.Dapr + Oxygen-Saga.Store.Dapr),但是整体集成逻辑和(Oxygen-Saga.PubSub.Rabbitmq + Oxygen-Saga.Store.Redis) 区别不大,只有部分队列和持久化实现的区别。
首先我们需要一个管理所有分布式事务配置的管理包项目,这个包会作为一个通用的nuget or 项目被其他具体的业务服务继承。然后所有的Saga配置都应该在这个包中编写对应的配置文件。同时每个服务会引入这个包并且创建自己的SagaConfiguration配置文件用于向saga服务申明自己需要创建Saga服务
三、注册
注册的过程主要分为三个部分,一个部分就是需要把之前Topic配置文件引入到本地并形成我们的Saga配置,一部分是引入我们需要的第三方的组件来支持整个saga运行起来,另一部分是创建对应的订阅服务代理来支持对事件的订阅和分发。
四、流转
当整个注册工作完成后启动项目会按照上图所示进行相关的注册和构造代理的工作,接下来就是具体业务的流转过程,所有的订阅会被一个SagaSubscribe(由上一步的RegisterSagaHandler触发并注入ISagaEventHandler后创建所得)接受到,并通过该Hub完成对应的路由投递到具体的方法函数进行业务流转。整个Hub结构如下所示,可以看到其实Saga是在每个服务注册了一个订阅器,通过订阅器接收到其他业务发布的Saga事件后通过代理的方式路由到具体的业务实现上来实现流程代理的。
那Saga又是怎么实现自动化的发布下一个事件或者根据代理调用目标函数抛出异常后自动进行补偿事件的呢,这就要看Saga的传递模型结构了,整个模型结构如下图,可以看到这里的Topic其实就是对应到的上图的TopicName,所以当Proxy收到事件解析出Topic后即可路由到具体的函数进行处理,当函数处理完毕后,Proxy即可根据链表当前所在的节点获取Next,如果Next不为空,则发送当前函数回调的Result封装成一个消息包投递到Next所属的服务,如果Next为空则说明整个流程结束,不再执行任何操作。
五、异常
同样的由于不能确保所有的业务能够完完全全的处理完毕自己业务比如当库存不足时,余额不足时,这时候就需要业务端自行通过触发异常的方式回调上一步的补偿,这样代理就会尝试从链表中获取Previous,如果Previous不为空,则发送当前函数抛出异常(SagaException<T>)回调的Result封装成一个消息包投递到Previous所属的服务,如果Previous为空则说明整个补偿流程结束,不再执行任何操作。不过还有一种情况,就是当前目标函数触发的是非SagaException异常,这种情况下Saga没有办法获取到上一步补偿所需的数据,所以这个时候就只能交给人工处理,也就是在注册部分第三步RegisterSagaHandler可以注入一个异常处理程序,当发生异常Saga无法处理时我们会尝试向Func<IServiceProvider, ErrorModel, Task> errorHandle 函数投递ErrorModel,由客户端自行决定如何处理异常。
六、持久化
所有的消息在流转过程中会被包装为一个SagaData类型持久化到Store中,并根据流转类型被赋予三个状态,分别是Processing,Done,Error。这部分数据会在Store中保存24小时。未来扩展中会提供相应的接口获取这部分数据用于框架扩展。
目录:
一、通过Dapr实现一个简单的基于.net的微服务电商系统
二、通过Dapr实现一个简单的基于.net的微服务电商系统(二)——通讯框架讲解
三、通过Dapr实现一个简单的基于.net的微服务电商系统(三)——一步一步教你如何撸Dapr
四、通过Dapr实现一个简单的基于.net的微服务电商系统(四)——一步一步教你如何撸Dapr之订阅发布
五、通过Dapr实现一个简单的基于.net的微服务电商系统(五)——一步一步教你如何撸Dapr之状态管理
六、通过Dapr实现一个简单的基于.net的微服务电商系统(六)——一步一步教你如何撸Dapr之Actor服务
七、通过Dapr实现一个简单的基于.net的微服务电商系统(七)——一步一步教你如何撸Dapr之服务限流
八、通过Dapr实现一个简单的基于.net的微服务电商系统(八)——一步一步教你如何撸Dapr之链路追踪
九、通过Dapr实现一个简单的基于.net的微服务电商系统(九)——一步一步教你如何撸Dapr之OAuth2授权 && 百度版Oauth2
十、通过Dapr实现一个简单的基于.net的微服务电商系统(十)——一步一步教你如何撸Dapr之绑定
十一、通过Dapr实现一个简单的基于.net的微服务电商系统(十一)——一步一步教你如何撸Dapr之自动扩/缩容
十二、通过Dapr实现一个简单的基于.net的微服务电商系统(十二)——istio+dapr构建多运行时服务网格
十三、通过Dapr实现一个简单的基于.net的微服务电商系统(十三)——istio+dapr构建多运行时服务网格之生产环境部署
十四、通过Dapr实现一个简单的基于.net的微服务电商系统(十四)——开发环境容器调试小技巧
十五、通过Dapr实现一个简单的基于.net的微服务电商系统(十五)——集中式接口文档实现
十六、通过Dapr实现一个简单的基于.net的微服务电商系统(十六)——dapr+sentinel中间件实现服务保护
十七、通过Dapr实现一个简单的基于.net的微服务电商系统(十七)——服务保护之动态配置与热重载
十八、通过Dapr实现一个简单的基于.net的微服务电商系统(十八)——服务保护之多级缓存
十九、通过Dapr实现一个简单的基于.net的微服务电商系统(十九)——分布式事务之Saga模式
二十、通过Dapr实现一个简单的基于.net的微服务电商系统(二十)——Saga框架实现思路分享
附录:(如果你觉得对你有用,请给个star)
一、电商Demo地址
标签:服务,saga,协调,Saga,Dapr,事件,net,电商,实现 来源: https://www.cnblogs.com/Leo_wl/p/16246160.html