Springboot+Cloud+Nacos+Seata整合分布式事务
作者:互联网
一.项目架构
Springboot: 2.3.3.RELEASE
Springcloud: Hoxton.SR7
JDK: 11
Seata: 1.4.0
Mysql: 8.0.21
Nacos: 1.3.2
这里只介绍Seata的安装及配置步骤,其他的服务如Mysql、Nacos等略过,大家可以自行安装
二.安装Seata
1.下载
seata: https://github.com/seata/seata/releases
根据版本下载服务端,项目使用的是1.4.0版本
下载完成后解压文件结构如下
bin:启动命令
conf:配置文件
lib:依赖包等
2.配置
进入到conf文件夹
这里一般我们需要修改的文件有两个:
file.conf
registry.conf
file.conf
项目里需要修改的地方是数据源的配置,store数据仓库配置的方式有三种 file db redis,项目里使用的是mysql,所以设置mode="db",这里需要注意的是不同的Mysql版本需要配置对应的驱动
因为项目用的是Mysql是8.0.21,所以驱动设置的是 driverClassName = "com.mysql.cj.jdbc.Driver"
数据库配置用的druid,所以设置datasource = "druid"
然后就可以设置数据库的账号、密码、使用的数据库等
到这里file.conf就配置好了,其他的配置大家可以自己根据需要再进行配置
registry.conf
项目里seata服务端的配置文件读的本地,所以只需要更改registry的配置
registry支持的类型比较多,大家可以根据实际情况来选择,项目里因为整合了Nacos,所以这里我们设置 type="nacos"
nacos的配置
application:服务名称
sercerAddr:Nacos的ip+port,比如:127.0.0.1:8848
group:Nacos分组,比如:DEFAULT_GROUP
namespace:Nacos空间别名,根据实际情况配置
3.初始化数据库脚本
脚本下载:https://github.com/seata/seata/tree/develop/script/server/db
这里选择对应的数据库脚本,下载完成后在配置的数据里执行即可(项目里是单独给seata配置了一个数据源)
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(96),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
执行完成后,会初始化下面三张表给服务端使用
4.启动Seata服务端
通过上面的配置,我们就可以进入刚才解压好的seata文件的bin目录中
window 启动脚本(双击即可)->seata-server.bat
linux 启动脚本->seata-server.sh
以window为例,双击seata-server.bat后弹出命令框,提示
Server started, listen port: 8091表示本地启动成功
查看Nacos服务列表,在对应的命名空间下看到有对应的Seata服务端已经注册到Nacos上了,到此Seata服务端启动完成
三、服务端配置
1.项目中模拟分布式事务的业务场景
测试中涉及到四个服务 manage member order product
业务流(这里只是简单模拟,不是真实业务场景):
manage提供测试入口,依次调用product商品服务扣减库存,member会员服务扣减积分,order订单服务创建订单,模拟异常查看测试结果
2.配置说明
a.manage member order product 的配置文件中需要指定
seata.tx-service-group
seata.tx-service-group=my_test_tx_group(默认的)
注意这里的 my_test_tx_group 必须要服务端的一致,如果需要改这个配置,服务端也必须要改
b.pom.xml
manage member order product 的pom文件分别需要导入seata的依赖jar(项目使用的Maven)
spring-cloud-starter-alibaba-seata
seata-spring-boot-starter
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2.2.3.RELEASE</version>
<exclusions>
<exclusion`>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.0</version>
</dependency>
刚开始引入的时候,项目里报cglib依赖冲突的错误,最后查询资料上说是因为项目里整合mybatis,mybatis的cglib需要2.x.x的版本,否则就会冲突
最后修改为如下红色所示,如果没有出现依赖冲突,按照上面的配置即可,如果出现cglib冲突,可以参考下面的配置
c.数据库初始化
下载地址: https://github.com/seata/seata/blob/develop/script/client/at/db/mysql.sql
-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
因为项目是分库的,所以需要在每个微服务对应的数据源中初始化一个自己的undo_log表
结合项目的业务场景需要初始化三张简单的业务表
member_test: 会员表->用于扣减积分
order_test: 订单表->用于创建订单
product_test: 商品表->用户扣减库存
CREATE TABLE `member_test` (
`id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
`member_id` int NOT NULL DEFAULT '0' COMMENT '会员id',
`integral` int NOT NULL DEFAULT '0' COMMENT '积分',
PRIMARY KEY (`id`),
UNIQUE KEY `un_member_id` (`member_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `order_test` (
`id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
`order_no` varchar(50) NOT NULL COMMENT '订单编号',
`member_id` int NOT NULL DEFAULT '0' COMMENT '会员id',
`product_id` int NOT NULL COMMENT '商品id',
`quantity` int NOT NULL DEFAULT '0' COMMENT '数量',
`amount` decimal(12,2) NOT NULL DEFAULT '0.00' COMMENT '金额',
PRIMARY KEY (`id`),
UNIQUE KEY `un_order_no` (`order_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `product_test` (
`id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
`product_name` varchar(50) NOT NULL DEFAULT '' COMMENT '名称',
`inventory` int NOT NULL DEFAULT '0' COMMENT '库存',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
4.项目部署
manage: 统一服务管理,测试入口
Controller
package intelligence.core.manage.controller.test;
import intelligence.core.dependent.dto.test.SeataTestDto;
import intelligence.core.manage.service.test.TransactionService;
import intelligence.library.common.annotation.AuthIgnore;
import intelligence.library.common.vo.base.ResponseVo;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author intelligence
*/
@Api(tags = {"事务测试管理"})
@RestController
@RequestMapping("/api/${api.name}/seata")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class TransactionController {
private final TransactionService transactionService;
@ApiOperation("测试")
@PostMapping("test")
@AuthIgnore
public ResponseVo test(@RequestBody SeataTestDto dto) {
return transactionService.test(dto);
}
}
Dto
package intelligence.core.dependent.dto.test;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author intelligence
*/
@Data
@ApiModel
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SeataTestDto {
@ApiModelProperty("会员id")
private int memberId;
@ApiModelProperty("商品id")
private int productId;
@ApiModelProperty("商品数量")
private int quantity;
}
Service
这里的一个关键注解用于开启全局事务
@GlobalTransactional 用户统一入口的全局事务管理,项目加在manage上
@Transactional 各个微服务方法上添加此注解用于开启本地事务,项目里加在 meber product order三个服务对应的方法上
这里分别注入 member product order的client用于服务间的调用(项目整合了Feign用来实现微服务调用)
package intelligence.core.manage.service.test;
import intelligence.core.dependent.client.member.MemberTestClient;
import intelligence.core.dependent.client.order.OrderTestClient;
import intelligence.core.dependent.client.product.ProductTestClient;
import intelligence.core.dependent.dto.test.SeataTestDto;
import intelligence.library.common.dto.exception.CommonException;
import intelligence.library.common.vo.base.ResponseVo;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author intelligence
*/
@Service
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class TransactionService {
private final OrderTestClient orderTestClient;
private final ProductTestClient productTestClient;
private final MemberTestClient memberTestClient;
@GlobalTransactional(rollbackFor = Exception.class)
public ResponseVo test(SeataTestDto dto) {
//扣减库存
ResponseVo productVo = productTestClient.test(dto);
if (!productVo.ok()) {
throw CommonException.from("扣减库存失败");
}
//扣减积分
ResponseVo memberVo = memberTestClient.test(dto);
if (!memberVo.ok()) {
throw CommonException.from("扣减积分失败");
}
//创建订单
ResponseVo orderVo = orderTestClient.test(dto);
if (!orderVo.ok()) {
throw CommonException.from("创建订单失败");
}
//模拟异常
throw CommonException.from("创建订单失败");
}
}
member: 会员服务
package intelligence.core.member.service;
import intelligence.core.dependent.dto.test.SeataTestDto;
import intelligence.core.dependent.entity.member.MemberTest;
import intelligence.core.member.mapper.MemberTestMapper;
import intelligence.library.common.dto.exception.CommonException;
import intelligence.library.common.utils.base.ResponseUtils;
import intelligence.library.common.vo.base.ResponseVo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* @author intelligence
*/
@Service
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class MemberTestService {
private final MemberTestMapper memberTestMapper;
@Transactional(rollbackFor = Exception.class)
public ResponseVo test(SeataTestDto dto) {
MemberTest memberTest = memberTestMapper.findByMemberId(dto.getMemberId());
if (null == memberTest) {
throw CommonException.from("会员积分记录不存在");
}
if (dto.getQuantity() * 10 > memberTest.getIntegral()) {
throw CommonException.from("会员积分不足");
}
memberTest.setIntegral(memberTest.getIntegral() - dto.getQuantity() * 10);
memberTestMapper.update(memberTest);
return ResponseUtils.success();
}
}
order: 订单服务
package intelligence.core.order.service.service;
import intelligence.core.dependent.dto.test.SeataTestDto;
import intelligence.core.dependent.entity.order.OrderTest;
import intelligence.core.order.mapper.order.OrderTestMapper;
import intelligence.library.common.utils.base.RandomUtils;
import intelligence.library.common.utils.base.ResponseUtils;
import intelligence.library.common.vo.base.ResponseVo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
/**
* @author intelligence
*/
@Service
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class OrderTestService {
@Value("${order.prefix}")
private String orderPreFix;
private final OrderTestMapper orderTestMapper;
@Transactional(rollbackFor = Exception.class)
public ResponseVo test(SeataTestDto dto) {
OrderTest orderTest = OrderTest.builder()
.amount(BigDecimal.ONE)
.memberId(dto.getMemberId())
.orderNo(orderPreFix + RandomUtils.getOrderCode(16))
.productId(dto.getProductId())
.quantity(dto.getQuantity())
.build();
orderTestMapper.insert(orderTest);
return ResponseUtils.success();
}
}
product: 商品服务
package intelligence.core.product.service.product;
import intelligence.core.dependent.dto.test.SeataTestDto;
import intelligence.core.dependent.entity.product.ProductTest;
import intelligence.core.product.mapper.product.ProductTestMapper;
import intelligence.library.common.dto.exception.CommonException;
import intelligence.library.common.utils.base.ResponseUtils;
import intelligence.library.common.vo.base.ResponseVo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* @author intelligence
*/
@Service
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ProductTestService {
private final ProductTestMapper productTestMapper;
@Transactional(rollbackFor = Exception.class)
public ResponseVo test(SeataTestDto dto) {
ProductTest productTest = productTestMapper.findById(dto.getProductId());
if (null == productTest) {
throw CommonException.from("商品信息不存在");
}
if (dto.getQuantity() > productTest.getInventory()) {
throw CommonException.from("商品可用库存不足");
}
productTest.setInventory(productTest.getInventory() - dto.getQuantity());
productTestMapper.update(productTest);
return ResponseUtils.success();
}
}
启动manage product member order服务
transactionServiceGroup='my_test_tx_group' 为我们刚配置的事务组,看到如下内容表示事务注册成功
查看Nacos服务列表,包含Seata的服务端,所有测试的服务已经全部注册好了
五、测试
模拟场景: id=1的会员 购买1个id=1的商品
这里只是简单的模拟一个异常场景下的全局事务回滚
会员1有10积分
商品1有一个库存
订单数据为空
发送请求
1.这时Seata数据源中的global_table中有一条全局事务的记录,
2.当库存扣减成功后(进入到扣减积分逻辑),查看商品库存表的库存已经变成了0,商品数据源中的undo_log中也有一条分支事务的记录
3.积分扣减成功后(进入到创建订单逻辑),查看会员的积分已经变成了0(扣了10积分),会员数据源中的undo_log中也有一条分支事务的记录
4.创建订单后,订单表中多了一条订单记录,订单数据源中的undo_log中也有一条分支事务的记录
5.模拟异常,全局事务回滚,商品库存回滚为1,会员积分回滚为10,订单数据清空,全局事务处理成功
标签:dto,Springboot,intelligence,seata,Nacos,test,import,id,Seata 来源: https://blog.csdn.net/weixin_40723662/article/details/111163507