spring cloud 分布式事务 seata
作者:互联网
seata安装与部署
什么是seata?
Seata
是阿里开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。
官方中文文档:https://seata.io/zh-cn/docs/dev/mode/at-mode.html
数据库事务的基本概念(ACID)
原子性(Atomicity):操作这些指令时,要么全部执行成功,要么全部不执行。只要其中一个指令执行失败,所有的指令都执行失败,数据进行回滚,回到执行指令前的数据状态。要么执行,要么不执行
一致性(Consistency):事务的执行使数据从一个状态转换为另一个状态,数据库的完整性约束没有被破坏。能量守恒,总量不变
隔离性(Isolation):隔离性是当多个用户并发访问数据库时,比如操作同一张表时,数据库为每一个用户开启的事务,不能被其他事务的操作所干扰,多个并发事务之间要相互隔离。信息彼此独立,互不干扰
持久性(Durability):当事务正确完成后,它对于数据的改变是永久性的。不会轻易丢失
什么是分布式事务
指一次大的操作由不同的小操作组成的,这些小的操作分布在不同的服务器上,分布式事务需要保证这些小操作要么全部成功,要么全部失败。从本质上来说,分布式事务就是为了保证不同数据库的数据一致性。
简单的说,在分布式系统上,一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务节点上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。
举个例子:在电商网站中,用户对商品进行下单,需要在订单表中创建一条订单数据,同时需要在库存表中修改当前商品的剩余库存数量,两步操作一个添加,一个修改,我们一定要保证这两步操作一定同时操作成功或失败,否则业务就会出现问题。
为什么要使用分布式事务
在微服务独立数据源的思想,每一个微服务都有一个或者多个数据源,虽然单机单库事务已经非常成熟,但是由于网路延迟和不可靠的客观因素,分布式事务到现在也还没有成熟的方案,对于中大型网站,特别是涉及到交易的网站,一旦将服务拆分微服务,分布式事务一定是绕不开的一个组件,通常解决分布式事务问题。
Seata为用户提供了AT、TCC、SAGA和XA事务模式,为用户打造一站式的分布式解决方案;
四种事务模式中,XA模式正在开发中...,其他事务模式已经实现;
目前使用的流行度情况是:AT > TCC > Saga;
大部分公司都采用的AT事务模式;
常见的分布式事务解决方案
1、使用MQ
2、使用LCN
3、使用Seata
Seata AT 模式
前提
- 基于支持本地 ACID 事务的关系型数据库。
- Java 应用,通过 JDBC 访问数据库。
整体机制
两阶段提交协议:
- 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
- 二阶段:
- 提交异步化,非常快速地完成。
- 回滚通过一阶段的回滚日志进行反向补偿。
写隔离
- 一阶段本地事务提交前,需要确保先拿到 全局锁 。
- 拿不到 全局锁 ,不能提交本地事务。
- 拿 全局锁 的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁。
两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。
tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。
tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 。
tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。
tx1 二阶段全局提交,释放 全局锁 。tx2 才拿到 全局锁 提交本地事务
如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。
此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。
分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。
因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。
读隔离
在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted) 。
如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。
SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。
出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。
工作机制
业务表:product
Field Type Key
id bigint(20) PRI
name varchar(100)
since varchar(100)
AT 分支事务的业务逻辑:
update product set name = '123' where name = '666';
阶段一
过程:
- 解析 SQL:得到 SQL 的类型(UPDATE),表(product),条件(where name = '666')等相关的信息。
- 查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。
select id, name, since from product where name = '666';
得到前镜像:
id | name | since |
---|---|---|
1 | 666 | 2014 |
- 执行业务 SQL:更新这条记录的 name 为 '666'。
- 查询后镜像:根据前镜像的结果,通过 主键 定位数据。
select id, name, since from product where id = 1`;
得到后镜像:
id | name | since |
---|---|---|
1 | 123 | 2014 |
- 插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到
UNDO_LOG
表中。
{
"branchId": 641789253,
"undoItems": [{
"afterImage": {
"rows": [{
"fields": [
{"name": "id","type": 4,"value": 1},
{"name": "name","type": 12,"value": "666"},
{"name": "since","type": 12,"value": "2014"}
]
}],
"tableName": "product"
},
"beforeImage": {
"rows": [{
"fields": [
{"name": "id","type": 4,"value": 1},
{"name": "name","type": 12,"value": "123"},
{"name": "since","type": 12,"value": "2014"}
]
}],
"tableName": "product"
},
"sqlType": "UPDATE"
}],
"xid": "xid:xxx"
}
- 提交前,向 TC 注册分支:申请
product
表中,主键值等于 1 的记录的 全局锁 。 - 本地事务提交:业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。
- 将本地事务提交的结果上报给 TC。
阶段二-回滚
- 收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作。
- 通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。
- 数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理,详细的说明在另外的文档中介绍。
- 根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句:
update product set name = '123' where id = 1;
- 提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。
阶段二-提交
-
收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
-
异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。
1、下载seata
下载地址:https://github.com/seata/seata/releases
下载seata-server-$version.zip
包。
网盘地址: https://pan.baidu.com/s/1E9J52g6uW_VFWY34fHL6zA 提取码: vneh
我们是基于file的方式启动注册和承载配置的
1.2、启动seata
下载解压后(.zip),直接点击bin/seata-server.bat
就可以了。(我使用的是1.4.0版本)
启动完成后:
2、使用seata
因为seata需要进行全局事务和分支事务的记录,所以需要对应的存储,目前,TC有三种存储模式:
- file模式:适合单机模式,全局事务会话信息在内存中读写,并持久化本地文件 root.data,性能较高;
- db模式:适合集群模式,全局事务会话信息通过 db 共享,相对性能差点;
- redis模式:解决db存储的性能问题;
2.1、创建相关测试数据库和表。
# 订单数据库信息 seata_order
DROP DATABASE IF EXISTS seata_order;
CREATE DATABASE seata_order;
DROP TABLE IF EXISTS seata_order.p_order;
CREATE TABLE seata_order.p_order
(
id INT(11) NOT NULL AUTO_INCREMENT,
user_id INT(11) DEFAULT NULL,
product_id INT(11) DEFAULT NULL,
amount INT(11) DEFAULT NULL,
total_price DOUBLE DEFAULT NULL,
status VARCHAR(100) DEFAULT NULL,
add_time DATETIME DEFAULT CURRENT_TIMESTAMP,
last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;
DROP TABLE IF EXISTS seata_order.undo_log;
CREATE TABLE seata_order.undo_log
(
id BIGINT(20) NOT NULL AUTO_INCREMENT,
branch_id BIGINT(20) NOT NULL,
xid VARCHAR(100) NOT NULL,
context VARCHAR(128) NOT NULL,
rollback_info LONGBLOB NOT NULL,
log_status INT(11) NOT NULL,
log_created DATETIME NOT NULL,
log_modified DATETIME NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;
# 产品数据库信息 seata_product
DROP DATABASE IF EXISTS seata_product;
CREATE DATABASE seata_product;
DROP TABLE IF EXISTS seata_product.product;
CREATE TABLE seata_product.product
(
id INT(11) NOT NULL AUTO_INCREMENT,
price DOUBLE DEFAULT NULL,
stock INT(11) DEFAULT NULL,![image](uploading...)
last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;
DROP TABLE IF EXISTS seata_product.undo_log;
CREATE TABLE seata_product.undo_log
(
id BIGINT(20) NOT NULL AUTO_INCREMENT,
branch_id BIGINT(20) NOT NULL,
xid VARCHAR(100) NOT NULL,
context VARCHAR(128) NOT NULL,
rollback_info LONGBLOB NOT NULL,
log_status INT(11) NOT NULL,
log_created DATETIME NOT NULL,
log_modified DATETIME NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;
INSERT INTO seata_product.product (id, price, stock)
VALUES (1, 10, 20);
# 账户数据库信息 seata_account
DROP DATABASE IF EXISTS seata_account;
CREATE DATABASE seata_account;
DROP TABLE IF EXISTS seata_account.account;
CREATE TABLE seata_account.account
(
id INT(11) NOT NULL AUTO_INCREMENT,
balance DOUBLE DEFAULT NULL,
last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;
DROP TABLE IF EXISTS seata_account.undo_log;
CREATE TABLE seata_account.undo_log
(
id BIGINT(20) NOT NULL AUTO_INCREMENT,
branch_id BIGINT(20) NOT NULL,
xid VARCHAR(100) NOT NULL,
context VARCHAR(128) NOT NULL,
rollback_info LONGBLOB NOT NULL,
log_status INT(11) NOT NULL,
log_created DATETIME NOT NULL,
log_modified DATETIME NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4;
INSERT INTO seata_account.account (id, balance)
VALUES (1, 50);
其中,每个库中的undo_log
表,是Seata AT
模式必须创建的表,主要用于分支事务的回滚。
2.2、项目引用
引入platform-common-datasource
依赖(包含seata
配置)
<!-- ruoyi common datasource -->
<dependency>
<groupId>com.heyuht.hospital</groupId>
<artifactId>platform-common-datasource</artifactId>
</dependency>
2.3、配置文件
server:
port: 9900
# spring配置
spring:
application:
name: platform-seata
redis:
host: 192.168.2.127
port: 6379
password:
datasource:
druid:
stat-view-servlet:
enabled: true
loginUsername: admin
loginPassword: 123456
dynamic:
druid:
initial-size: 5
min-idle: 5
maxActive: 20
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
filters: stat,wall,slf4j
connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
datasource:
# 主库数据源
master:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.2.127:3366/ry-cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: yyrj#2021
# seata_order数据源
order:
username: root
password: yyrj#2021
url: jdbc:mysql://192.168.2.127:3366/seata_order?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
driver-class-name: com.mysql.cj.jdbc.Driver
# seata_account数据源
account:
username: root
password: yyrj#2021
url: jdbc:mysql://192.168.2.127:3366/seata_account?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
driver-class-name: com.mysql.cj.jdbc.Driver
# seata_product数据源
product:
username: root
password: yyrj#2021
url: jdbc:mysql://192.168.2.127:3366/seata_product?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
driver-class-name: com.mysql.cj.jdbc.Driver
seata: true #开启seata代理,开启后默认每个数据源都代理,如果某个不需要代理可单独关闭
# seata配置
seata:
enabled: true
# Seata 应用编号,默认为 ${spring.application.name}
application-id: ${spring.application.name}
# Seata 事务组编号,用于 TC 集群名
tx-service-group: ${spring.application.name}-group
# 关闭自动代理
enable-auto-data-source-proxy: false
# 服务配置项
service:
# 虚拟组和分组的映射
vgroup-mapping:
platform-seata-group: default
# 分组和 Seata 服务的映射
grouplist:
default: 127.0.0.1:8091
config:
type: file
registry:
type: file
mybatis-plus:
mapperLocations: classpath*:mapper/**/*Mapper.xml
typeAliasesPackage: com.heyuht.hospital.**.domain
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
2.4、示例代码
Domain
Account.java
package com.heyuht.hospital.domain;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
/**
* 用户
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("account")
public class Account implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 余额
*/
private Double balance;
private Date lastUpdateTime;
}
Order.java
package com.heyuht.hospital.domain;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 订单
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("p_order")
public class Order implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* 用户ID
*/
private Long userId;
/**
* 商品ID
*/
private Long productId;
/**
* 订单状态
*/
private int status;
/**
* 数量
*/
private Integer amount;
/**
* 总金额
*/
private Double totalPrice;
public Order(Long userId, Long productId, int status, Integer amount)
{
this.userId = userId;
this.productId = productId;
this.status = status;
this.amount = amount;
}
}
Product.java
package com.heyuht.hospital.domain;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
/**
* 产品
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("product")
public class Product implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* 价格
*/
private Double price;
/**
* 库存
*/
private Integer stock;
private Date lastUpdateTime;
}
Dto
PlaceOrderRequest.java
package com.heyuht.hospital.domain.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PlaceOrderRequest {
private Long userId;
private Long productId;
private Integer amount;
}
ReduceBalanceRequest.java
package com.heyuht.hospital.domain.dto;
import lombok.Data;
@Data
public class ReduceBalanceRequest {
private Long userId;
private Integer price;
}
ReduceStockRequest.java
package com.heyuht.hospital.domain.dto;
import lombok.Data;
@Data
public class ReduceStockRequest {
private Long productId;
private Integer amount;
}
Mapper
AccountMapper.java
package com.heyuht.hospital.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heyuht.hospital.domain.Account;
/**
* <p>
* 用户表 Mapper 接口
* </p>
*
* @author galen
* @since 2021-07-23
*/
public interface AccountMapper extends BaseMapper<Account> {
}
OrderMapper.java
package com.heyuht.hospital.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heyuht.hospital.domain.Order;
public interface OrderMapper extends BaseMapper<Order> {
}
ProductMapper.java
package com.heyuht.hospital.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heyuht.hospital.domain.Product;
public interface ProductMapper extends BaseMapper<Product> {
}
Service
AccountService.java
package com.heyuht.hospital.service;
public interface AccountService {
/**
* 账户扣减
*
* @param userId 用户 ID
* @param price 扣减金额
*/
void reduceBalance(Long userId, Double price);
}
OrderService.java
package com.heyuht.hospital.service;
import com.heyuht.hospital.domain.dto.PlaceOrderRequest;
public interface OrderService {
/**
* 下单
*
* @param placeOrderRequest 订单请求参数
*/
void placeOrder(PlaceOrderRequest placeOrderRequest);
}
ProductService.java
package com.heyuht.hospital.service;
public interface ProductService {
/**
* 扣减库存
*
* @param productId 商品 ID
* @param amount 扣减数量
* @return 商品总价
*/
Double reduceStock(Long productId, Integer amount);
}
ServiceImpl
AccountServiceImpl.java
package com.heyuht.hospital.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.heyuht.hospital.domain.Account;
import com.heyuht.hospital.mapper.AccountMapper;
import com.heyuht.hospital.service.AccountService;
import io.seata.core.context.RootContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
@Service
public class AccountServiceImpl implements AccountService {
private static final Logger log = LoggerFactory.getLogger(AccountServiceImpl.class);
@Resource
private AccountMapper accountMapper;
/**
* 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW
*/
@DS("account")
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void reduceBalance(Long userId, Double price) {
log.info("=============ACCOUNT START=================");
log.info("当前 XID: {}", RootContext.getXID());
Account account = accountMapper.selectById(userId);
Double balance = account.getBalance();
log.info("下单用户{}余额为 {},商品总价为{}", userId, balance, price);
if (balance < price) {
log.warn("用户 {} 余额不足,当前余额:{}", userId, balance);
throw new RuntimeException("余额不足");
}
log.info("开始扣减用户 {} 余额", userId);
double currentBalance = account.getBalance() - price;
account.setBalance(currentBalance);
accountMapper.updateById(account);
log.info("扣减用户 {} 余额成功,扣减后用户账户余额为{}", userId, currentBalance);
log.info("=============ACCOUNT END=================");
}
}
OrderServiceImpl.java
package com.heyuht.hospital.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.heyuht.hospital.domain.Order;
import com.heyuht.hospital.domain.dto.PlaceOrderRequest;
import com.heyuht.hospital.mapper.OrderMapper;
import com.heyuht.hospital.service.AccountService;
import com.heyuht.hospital.service.OrderService;
import com.heyuht.hospital.service.ProductService;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
@Service
public class OrderServiceImpl implements OrderService {
private static final Logger log = LoggerFactory.getLogger(OrderServiceImpl.class);
@Resource
private OrderMapper orderMapper;
@Autowired
private AccountService accountService;
@Autowired
private ProductService productService;
@DS("order") // 每一层都需要使用多数据源注解切换所选择的数据库
@Override
@Transactional
@GlobalTransactional // 重点 第一个开启事务的需要添加seata全局事务注解
public void placeOrder(PlaceOrderRequest request) {
log.info("=============ORDER START=================");
Long userId = request.getUserId();
Long productId = request.getProductId();
Integer amount = request.getAmount();
log.info("收到下单请求,用户:{}, 商品:{},数量:{}", userId, productId, amount);
log.info("当前 XID: {}", RootContext.getXID());
Order order = new Order(userId, productId, 0, amount);
orderMapper.insert(order);
log.info("订单一阶段生成,等待扣库存付款中");
// 扣减库存并计算总价
Double totalPrice = productService.reduceStock(productId, amount);
// 扣减余额
accountService.reduceBalance(userId, totalPrice);
order.setStatus(1);
order.setTotalPrice(totalPrice);
orderMapper.updateById(order);
log.info("订单已成功下单");
log.info("=============ORDER END=================");
}
}
ProductServiceImpl.java
package com.heyuht.hospital.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.heyuht.hospital.domain.Product;
import com.heyuht.hospital.mapper.ProductMapper;
import com.heyuht.hospital.service.ProductService;
import io.seata.core.context.RootContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
@Service
public class ProductServiceImpl implements ProductService {
private static final Logger log = LoggerFactory.getLogger(ProductServiceImpl.class);
@Resource
private ProductMapper productMapper;
/**
* 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW
*
* @return
*/
@DS("product")
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public Double reduceStock(Long productId, Integer amount) {
log.info("=============PRODUCT START=================");
log.info("当前 XID: {}", RootContext.getXID());
// 检查库存
Product product = productMapper.selectById(productId);
Integer stock = product.getStock();
log.info("商品编号为 {} 的库存为{},订单商品数量为{}", productId, stock, amount);
if (stock < amount) {
log.warn("商品编号为{} 库存不足,当前库存:{}", productId, stock);
throw new RuntimeException("库存不足");
}
log.info("开始扣减商品编号为 {} 库存,单价商品价格为{}", productId, product.getPrice());
// 扣减库存
int currentStock = stock - amount;
product.setStock(currentStock);
productMapper.updateById(product);
double totalPrice = product.getPrice() * amount;
log.info("扣减商品编号为 {} 库存成功,扣减后库存为{}, {} 件商品总价为 {} ", productId, currentStock, amount, totalPrice);
log.info("=============PRODUCT END=================");
return totalPrice;
}
}
Controller
OrderController.java
package com.heyuht.hospital.controller;
import com.heyuht.hospital.service.OrderService;
import com.heyuht.hospital.domain.dto.PlaceOrderRequest;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private OrderService orderService;
@PostMapping("/placeOrder")
public String placeOrder(@Validated @RequestBody PlaceOrderRequest request) {
orderService.placeOrder(request);
return "下单成功";
}
@PostMapping("/test1")
@ApiOperation("测试商品库存不足-异常回滚")
public String test1() {
// 商品单价10元,库存20个,用户余额50元,模拟一次性购买22个。 期望异常回滚
orderService.placeOrder(new PlaceOrderRequest(1L, 1L, 22));
return "下单成功";
}
@PostMapping("/test2")
@ApiOperation("测试用户账户余额不足-异常回滚")
public String test2() {
// 商品单价10元,库存20个,用户余额50元,模拟一次性购买6个。 期望异常回滚
orderService.placeOrder(new PlaceOrderRequest(1L, 1L, 6));
return "下单成功";
}
}
3、seata序列化日期类型出错
如果业务表中存在 datetime
的数据类型,那么在分布式事务中,修改这个字段的值时,会出现如下错误。此处提供2种解决方案。
17:10:46.037 [http-nio-9204-exec-1] ERROR i.s.r.d.u.p.JacksonUndoLogParser - [encode,127] - json encode exception, Type id handling not implemented for type java.lang.Object (by serializer of type com.fasterxml.jackson.databind.ser.impl.UnsupportedTypeSerializer) (through reference chain: io.seata.rm.datasource.undo.BranchUndoLog["sqlUndoLogs"]->java.util.ArrayList[0]->io.seata.rm.datasource.undo.SQLUndoLog["afterImage"]->io.seata.rm.datasource.sql.struct.TableRecords["rows"]->java.util.ArrayList[0]->io.seata.rm.datasource.sql.struct.Row["fields"]->java.util.ArrayList[30]->io.seata.rm.datasource.sql.struct.Field["value"])
com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Type id handling not implemented for type java.lang.Object (by serializer of type com.fasterxml.jackson.databind.ser.impl.UnsupportedTypeSerializer) (through reference chain: io.seata.rm.datasource.undo.BranchUndoLog["sqlUndoLogs"]->java.util.ArrayList[0]->io.seata.rm.datasource.undo.SQLUndoLog["afterImage"]->io.seata.rm.datasource.sql.struct.TableRecords["rows"]->java.util.ArrayList[0]->io.seata.rm.datasource.sql.struct.Row["fields"]->java.util.ArrayList[30]->io.seata.rm.datasource.sql.struct.Field["value"])
at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1276)
........
此处使用的Seata
的版本是1.4.2
3.1、数据库中不使用 datetime 数据类型
经过测试,发现 datetime
类型会出现问题,那么如果我们不使用这个类型,使用timestamp
类型代替,发现就可以解决问题。
3.2、修改undo_log默认的序列化方式
undo_log 默认的序列化方式 jackson
,在seata
中使用jackson序列化方式会有问题,那么我们修改 kryo
序列化方式就可以解决这个问题。
1、依赖文件中引入 kryo
ruoyi-cloud项目放到ruoyi-common-core/pom.xml下即可
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>0.42</version>
</dependency>
2、修改seata的换序列化方式 配置中心中配置client.undo.logSerialization=kryo
4、测试验证
使用Postman工具测试接口,注意观察运行日志,至此分布式事务集成案例全流程完毕。
4.1、正常下单
模拟正常下单,买一个商品 http://localhost:9201/order/placeOrder
Content-Type/application/json
{
"userId": 1,
"productId": 1,
"amount": 1
}
4.2、库存不足
模拟库存不足,事务回滚 http://localhost:9201/order/placeOrder
Content-Type/application/json
{
"userId": 1,
"productId": 1,
"amount": 22
}
4.3、用户余额不足
模拟用户余额不足,事务回滚 http://localhost:9201/order/placeOrder
Content-Type/application/json
{
"userId": 1,
"productId": 1,
"amount": 6
}
5、集成Nacos配置中心
1、解压seata-server-$version.zip
后修改conf/registry.conf
文件:
registry {
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = ""
cluster = "default"
username = "nacos"
password = "nacos"
}
}
config {
type = "nacos"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = "nacos"
password = "nacos"
}
}
由于使用nacos
作为注册中心,所以conf
目录下的file.conf
无需理会。然后就可以直接启动bin/seata-server.bat
,可以在nacos
里看到一个名为seata-server
的服务了。
2、由于seata
使用mysql
作为db
高可用数据库,故需要在mysql
创建一个ry-seata
库,并导入数据库脚本。
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(96),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
3、config.txt
文件复制到seata目录
config.txt
service.vgroupMapping.ruoyi-system-group=default
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/ry-seata?useUnicode=true
store.db.user=root
store.db.password=password
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
4、nacos-config.sh
复制到seata的conf目录
nacos-config.sh
#!/usr/bin/env bash
# Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at、
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
while getopts ":h:p:g:t:u:w:" opt
do
case $opt in
h)
host=$OPTARG
;;
p)
port=$OPTARG
;;
g)
group=$OPTARG
;;
t)
tenant=$OPTARG
;;
u)
username=$OPTARG
;;
w)
password=$OPTARG
;;
?)
echo " USAGE OPTION: $0 [-h host] [-p port] [-g group] [-t tenant] [-u username] [-w password] "
exit 1
;;
esac
done
urlencode() {
for ((i=0; i < ${#1}; i++))
do
char="${1:$i:1}"
case $char in
[a-zA-Z0-9.~_-]) printf $char ;;
*) printf '%%%02X' "'$char" ;;
esac
done
}
if [[ -z ${host} ]]; then
host=localhost
fi
if [[ -z ${port} ]]; then
port=8848
fi
if [[ -z ${group} ]]; then
group="SEATA_GROUP"
fi
if [[ -z ${tenant} ]]; then
tenant=""
fi
if [[ -z ${username} ]]; then
username=""
fi
if [[ -z ${password} ]]; then
password=""
fi
nacosAddr=$host:$port
contentType="content-type:application/json;charset=UTF-8"
echo "set nacosAddr=$nacosAddr"
echo "set group=$group"
failCount=0
tempLog=$(mktemp -u)
function addConfig() {
curl -X POST -H "${contentType}" "http://$nacosAddr/nacos/v1/cs/configs?dataId=$(urlencode $1)&group=$group&content=$(urlencode $2)&tenant=$tenant&username=$username&password=$password" >"${tempLog}" 2>/dev/null
if [[ -z $(cat "${tempLog}") ]]; then
echo " Please check the cluster status. "
exit 1
fi
if [[ $(cat "${tempLog}") =~ "true" ]]; then
echo "Set $1=$2 successfully "
else
echo "Set $1=$2 failure "
(( failCount++ ))
fi
}
count=0
for line in $(cat $(dirname "$PWD")/config.txt | sed s/[[:space:]]//g); do
(( count++ ))
key=${line%%=*}
value=${line#*=}
addConfig "${key}" "${value}"
done
echo "========================================================================="
echo " Complete initialization parameters, total-count:$count , failure-count:$failCount "
echo "========================================================================="
if [[ ${failCount} -eq 0 ]]; then
echo " Init nacos config finished, please start seata-server. "
else
echo " init nacos config fail. "
fi
5、执行命令,后面填写nacos
的IP地址,我的是本机所以是127.0.0.1
sh nacos-config.sh 127.0.0.1
成功后nacos
配置列表也能查询到相关配置
6、修改服务配置文件
# spring配置
spring:
datasource:
dynamic:
# 开启seata代理
seata: true
# seata配置
seata:
enabled: true
# Seata 应用编号,默认为 ${spring.application.name}
application-id: ${spring.application.name}
# Seata 事务组编号,用于 TC 集群名
tx-service-group: ${spring.application.name}-group
# 关闭自动代理
enable-auto-data-source-proxy: false
# 服务配置项
service:
# 虚拟组和分组的映射
vgroup-mapping:
ruoyi-system-group: default
config:
type: nacos
nacos:
serverAddr: 127.0.0.1:8848
group: SEATA_GROUP
namespace:
registry:
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
namespace:
6、测试Feign服务调用
测试使用ruoyi-file
添加Feign
调用测试文件入库,验证分布式数据库调用执行结果,也适用于新的应用。
标签:seata,spring,import,NULL,com,id,cloud,log 来源: https://www.cnblogs.com/galenblog/p/16277392.html