分布式事务(Saga模式)
作者:互联网
分布式事务(Saga模式)(解决mq,api分布式事务问题)
一 概览
1.分布式事务包含两个组件TxServer和TxClient
TxServer是分布式事务协调器,主要负责接收每个参与者事务开启和结束的上报信息,并做持久化。协调补偿子事务,达到事务的最终一致。
TxClient是分布式事务客户端,主要负责上报事务开启和结束结果信息。
2.业务接入
业务服务需要引入TxClient依赖
<dependency>
<groupId>com.mw</groupId>
<artifactId>mw-transaction-client-starter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>com.mw</groupId>
<artifactId>mw-transaction-common</artifactId>
</exclusion>
</exclusions>
</dependency>
3.api接口接入分布式事务
4.分布式事务应用案例
1.由服务A开启分布式事务,生成全局事务Id以及服务A的本地事务Id,上报到协调器开启事务。
2.服务A开始执行业务,调用服务B
3.服务B开启本地事务,上报到协调器
4.服务B执行业务
5.服务B业务执行完成,向协调器上报本地事务执行结果,服务B本地事务结束
6.服务B返回数据给服务A
7.服务A继续执行后续业务
8.服务A执行完成,向协调器上报全局事务执行结果,全局事务结束
5.成功场景:
所有服务都执行成功,协调器记录所有事务为已完成状态。
服务B异常场景:
B服务发生异常,A服务也会发生异常,AB两个子事务都是异常状态,不需要补偿。
服务A在服务B执行成功后发生异常场景:
服务B本地事务为成功状态,服务A本地事务发生异常,全局事务异常,补偿服务B业务,使服务B本地事务和全局事务达到一致。
服务B执行超时场景:
服务A在调用服务B时发生超时,则服务A本地事务发生异常,全局事务异常。此时服务可能还没执行完,协调器会定期扫描。如果服务B最终执行失败,则不进行补偿,最终执行成功,则进行补偿。
接入代码示例:
在服务A接口上添加@SagaStart注解,开启分布式事务
@GetMapping("/test")
@SagaStart(serviceName = "mw-crm-service-biz2")
public String test() throws Exception {
UserDTO userDTO = new UserDTO();
userDTO.setUserName("ACE");
userDTO.setAge(20);
crm1Api.service1(userDTO);
return "success";
}
在服务B接口上添加@TxCompensation注解,成为事务链的子事务。
参数compensationServiceName为全局事务发生异常后,协调器调用的补偿服务,compensationServicePath则为补偿接口路径,补偿接口参数直接拿原接口的参数
@PostMapping("/service1")
@TxCompensation(serviceName = "mw-crm-service-biz1", compensationServiceName = "mw-crm-service-biz1", compensationServicePath = "/crm1/compen/service1")
public String service1(@RequestBody UserDTO userDTO) {
return "service1";
}
@GetMapping("/service3")
@TxCompensation(serviceName = "mw-crm-service-biz1#service3", compensationServiceName = "mw-crm-service-biz1", compensationServicePath = "/crm1/compen/service3")
public String service3(@RequestParam("userName") String userName, @RequestParam("age") Integer age) {
return "service3";
}
业务补偿接口规范:
如果业务接口请求参数为对象实体,补偿接口通过json对象接收参数
@PostMapping("/compen/service1")
public ApiResult compenService(@RequestBody UserDTO userDTO) {
log.info("完成补偿1{}", userDTO);
return ApiResult.build(Code.Code_Success.getName(), Code.Code_Success.getValue());
}
如果业务接口请求参数为String或者基础数据类型,补偿接口使用restful方式接收
@PostMapping("/compen/service3/{p1}/{p2}")
public ApiResult compenService2(@PathVariable String p1, @PathVariable Integer p2) {
log.info("完成补偿p1{}, p2{}", p1, p2);
return ApiResult.build(Code.Code_Success.getName(), Code.Code_Success.getValue());
}
二 分布式事务Client中间件的原理
1.首先拦截@SagaStart和@TxCompensation 注解开启事务
2.执行wrapper中apply方法
目的:开启事务 上报主事务
createStartTxEvent方法:组装补偿数据(服务名称,url,方法,参数,token等数据)
createBaseTxEnent方法:上报事务组装数据(同上)
exePreIntercept方法:执行上报逻辑(采用rpc框架中的ThriftClient类似于openFeign等常见rpc)
3.执行wrapper中compensation方法
目的:开启子事务
执行逻辑和同上
三 分布式事务Client中间件的原理
1.定时任务对需要补偿的事务进行补偿
2.调用checkByGlobalTxId进行事务补偿
2.调用compensationHandler进行事务补偿
执行补偿逻辑
标签:事务,Code,服务,Saga,mw,补偿,分布式 来源: https://blog.csdn.net/qq_40354931/article/details/115304290