其他分享
首页 > 其他分享> > spring cloud 分布式事务 seata

spring cloud 分布式事务 seata

作者:互联网

seata安装与部署

什么是seata?

Seata是阿里开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。

官方中文文档:https://seata.io/zh-cn/docs/dev/mode/at-mode.html

数据库事务的基本概念(ACID)

原子性(Atomicity):操作这些指令时,要么全部执行成功,要么全部不执行。只要其中一个指令执行失败,所有的指令都执行失败,数据进行回滚,回到执行指令前的数据状态。要么执行,要么不执行

一致性(Consistency):事务的执行使数据从一个状态转换为另一个状态,数据库的完整性约束没有被破坏。能量守恒,总量不变

隔离性(Isolation):隔离性是当多个用户并发访问数据库时,比如操作同一张表时,数据库为每一个用户开启的事务,不能被其他事务的操作所干扰,多个并发事务之间要相互隔离。信息彼此独立,互不干扰

持久性(Durability):当事务正确完成后,它对于数据的改变是永久性的。不会轻易丢失

什么是分布式事务

​ 指一次大的操作由不同的小操作组成的,这些小的操作分布在不同的服务器上,分布式事务需要保证这些小操作要么全部成功,要么全部失败。从本质上来说,分布式事务就是为了保证不同数据库的数据一致性。

​ 简单的说,在分布式系统上,一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务节点上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。

​ 举个例子:在电商网站中,用户对商品进行下单,需要在订单表中创建一条订单数据,同时需要在库存表中修改当前商品的剩余库存数量,两步操作一个添加,一个修改,我们一定要保证这两步操作一定同时操作成功或失败,否则业务就会出现问题。

为什么要使用分布式事务

​ 在微服务独立数据源的思想,每一个微服务都有一个或者多个数据源,虽然单机单库事务已经非常成熟,但是由于网路延迟和不可靠的客观因素,分布式事务到现在也还没有成熟的方案,对于中大型网站,特别是涉及到交易的网站,一旦将服务拆分微服务,分布式事务一定是绕不开的一个组件,通常解决分布式事务问题。

image

Seata为用户提供了AT、TCC、SAGA和XA事务模式,为用户打造一站式的分布式解决方案;

四种事务模式中,XA模式正在开发中...,其他事务模式已经实现;

目前使用的流行度情况是:AT > TCC > Saga;

大部分公司都采用的AT事务模式;

常见的分布式事务解决方案

1、使用MQ
2、使用LCN
3、使用Seata

Seata AT 模式

前提

整体机制

两阶段提交协议:

写隔离

两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。

tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。

tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁

tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁

tx1 二阶段全局提交,释放 全局锁 。tx2 才拿到 全局锁 提交本地事务

Write-Isolation: Commit

如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。

此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。

分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。

因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。

Write-Isolation: Rollback

读隔离

在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted)

如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。

Read Isolation: SELECT FOR UPDATE

SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。

出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。

工作机制

业务表:product

Field	Type	     Key
id	    bigint(20)	 PRI
name	varchar(100)	
since	varchar(100)	

AT 分支事务的业务逻辑:

update product set name = '123' where name = '666';

阶段一

过程:

  1. 解析 SQL:得到 SQL 的类型(UPDATE),表(product),条件(where name = '666')等相关的信息。
  2. 查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。
select id, name, since from product where name = '666';

得到前镜像:

id name since
1 666 2014
  1. 执行业务 SQL:更新这条记录的 name 为 '666'。
  2. 查询后镜像:根据前镜像的结果,通过 主键 定位数据。
select id, name, since from product where id = 1`;

得到后镜像:

id name since
1 123 2014
  1. 插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到 UNDO_LOG 表中。
{
	"branchId": 641789253,
	"undoItems": [{
		"afterImage": {
			"rows": [{
				"fields": [
                    {"name": "id","type": 4,"value": 1}, 
                    {"name": "name","type": 12,"value": "666"}, 
                    {"name": "since","type": 12,"value": "2014"}
                ]
			}],
			"tableName": "product"
		},
		"beforeImage": {
			"rows": [{
				"fields": [
                    {"name": "id","type": 4,"value": 1}, 
                    {"name": "name","type": 12,"value": "123"}, 
                    {"name": "since","type": 12,"value": "2014"}
                ]
			}],
			"tableName": "product"
		},
		"sqlType": "UPDATE"
	}],
	"xid": "xid:xxx"
}
  1. 提交前,向 TC 注册分支:申请 product 表中,主键值等于 1 的记录的 全局锁
  2. 本地事务提交:业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。
  3. 将本地事务提交的结果上报给 TC。

阶段二-回滚

  1. 收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作。
  2. 通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。
  3. 数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理,详细的说明在另外的文档中介绍。
  4. 根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句:
update product set name = '123' where id = 1;
  1. 提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。

阶段二-提交

  1. 收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。

  2. 异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。

1、下载seata

下载地址:https://github.com/seata/seata/releases

下载seata-server-$version.zip包。


网盘地址: https://pan.baidu.com/s/1E9J52g6uW_VFWY34fHL6zA 提取码: vneh

我们是基于file的方式启动注册和承载配置的

image

1.2、启动seata

下载解压后(.zip),直接点击bin/seata-server.bat就可以了。(我使用的是1.4.0版本)

启动完成后:
image

2、使用seata

因为seata需要进行全局事务和分支事务的记录,所以需要对应的存储,目前,TC有三种存储模式:

2.1、创建相关测试数据库和表。

# 订单数据库信息 seata_order
DROP DATABASE IF EXISTS seata_order;
CREATE DATABASE seata_order;

DROP TABLE IF EXISTS seata_order.p_order;
CREATE TABLE seata_order.p_order
(
    id               INT(11) NOT NULL AUTO_INCREMENT,
    user_id          INT(11) DEFAULT NULL,
    product_id       INT(11) DEFAULT NULL,
    amount           INT(11) DEFAULT NULL,
    total_price      DOUBLE       DEFAULT NULL,
    status           VARCHAR(100) DEFAULT NULL,
    add_time         DATETIME     DEFAULT CURRENT_TIMESTAMP,
    last_update_time DATETIME     DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (id)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8mb4;

DROP TABLE IF EXISTS seata_order.undo_log;
CREATE TABLE seata_order.undo_log
(
    id            BIGINT(20) NOT NULL AUTO_INCREMENT,
    branch_id     BIGINT(20) NOT NULL,
    xid           VARCHAR(100) NOT NULL,
    context       VARCHAR(128) NOT NULL,
    rollback_info LONGBLOB     NOT NULL,
    log_status    INT(11) NOT NULL,
    log_created   DATETIME     NOT NULL,
    log_modified  DATETIME     NOT NULL,
    PRIMARY KEY (id),
    UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8mb4;
  
# 产品数据库信息 seata_product
DROP DATABASE IF EXISTS seata_product;
CREATE DATABASE seata_product;

DROP TABLE IF EXISTS seata_product.product;
CREATE TABLE seata_product.product
(
    id               INT(11) NOT NULL AUTO_INCREMENT,
    price            DOUBLE   DEFAULT NULL,
    stock            INT(11) DEFAULT NULL,![image](uploading...)
    last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (id)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8mb4;

DROP TABLE IF EXISTS seata_product.undo_log;
CREATE TABLE seata_product.undo_log
(
    id            BIGINT(20) NOT NULL AUTO_INCREMENT,
    branch_id     BIGINT(20) NOT NULL,
    xid           VARCHAR(100) NOT NULL,
    context       VARCHAR(128) NOT NULL,
    rollback_info LONGBLOB     NOT NULL,
    log_status    INT(11) NOT NULL,
    log_created   DATETIME     NOT NULL,
    log_modified  DATETIME     NOT NULL,
    PRIMARY KEY (id),
    UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8mb4;

INSERT INTO seata_product.product (id, price, stock)
VALUES (1, 10, 20);


# 账户数据库信息 seata_account
DROP DATABASE IF EXISTS seata_account;
CREATE DATABASE seata_account;

DROP TABLE IF EXISTS seata_account.account;
CREATE TABLE seata_account.account
(
    id               INT(11) NOT NULL AUTO_INCREMENT,
    balance          DOUBLE   DEFAULT NULL,
    last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (id)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8mb4;

DROP TABLE IF EXISTS seata_account.undo_log;
CREATE TABLE seata_account.undo_log
(
    id            BIGINT(20) NOT NULL AUTO_INCREMENT,
    branch_id     BIGINT(20) NOT NULL,
    xid           VARCHAR(100) NOT NULL,
    context       VARCHAR(128) NOT NULL,
    rollback_info LONGBLOB     NOT NULL,
    log_status    INT(11) NOT NULL,
    log_created   DATETIME     NOT NULL,
    log_modified  DATETIME     NOT NULL,
    PRIMARY KEY (id),
    UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8mb4;
INSERT INTO seata_account.account (id, balance)
VALUES (1, 50);

其中,每个库中的undo_log表,是Seata AT模式必须创建的表,主要用于分支事务的回滚。

2.2、项目引用

引入platform-common-datasource依赖(包含seata配置)

<!-- ruoyi common datasource -->
<dependency>
	<groupId>com.heyuht.hospital</groupId>
	<artifactId>platform-common-datasource</artifactId>
</dependency>

2.3、配置文件

server:
  port: 9900
# spring配置
spring:
  application:
    name: platform-seata
  redis:
    host: 192.168.2.127
    port: 6379
    password:
  datasource:
    druid:
      stat-view-servlet:
        enabled: true
        loginUsername: admin
        loginPassword: 123456
    dynamic:
      druid:
        initial-size: 5
        min-idle: 5
        maxActive: 20
        maxWait: 60000
        timeBetweenEvictionRunsMillis: 60000
        minEvictableIdleTimeMillis: 300000
        validationQuery: SELECT 1 FROM DUAL
        testWhileIdle: true
        testOnBorrow: false
        testOnReturn: false
        poolPreparedStatements: true
        maxPoolPreparedStatementPerConnectionSize: 20
        filters: stat,wall,slf4j
        connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
      datasource:
        # 主库数据源
        master:
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://192.168.2.127:3366/ry-cloud?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
          username: root
          password: yyrj#2021
        # seata_order数据源
        order:
          username: root
          password: yyrj#2021
          url: jdbc:mysql://192.168.2.127:3366/seata_order?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
          driver-class-name: com.mysql.cj.jdbc.Driver
        # seata_account数据源
        account:
          username: root
          password: yyrj#2021
          url: jdbc:mysql://192.168.2.127:3366/seata_account?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
          driver-class-name: com.mysql.cj.jdbc.Driver
        # seata_product数据源
        product:
          username: root
          password: yyrj#2021
          url: jdbc:mysql://192.168.2.127:3366/seata_product?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
          driver-class-name: com.mysql.cj.jdbc.Driver
      seata: true    #开启seata代理,开启后默认每个数据源都代理,如果某个不需要代理可单独关闭

# seata配置
seata:
  enabled: true
  # Seata 应用编号,默认为 ${spring.application.name}
  application-id: ${spring.application.name}
  # Seata 事务组编号,用于 TC 集群名
  tx-service-group: ${spring.application.name}-group
  # 关闭自动代理
  enable-auto-data-source-proxy: false
  # 服务配置项
  service:
    # 虚拟组和分组的映射
    vgroup-mapping:
      platform-seata-group: default
    # 分组和 Seata 服务的映射
    grouplist:
      default: 127.0.0.1:8091
  config:
    type: file
  registry:
    type: file

mybatis-plus:
  mapperLocations: classpath*:mapper/**/*Mapper.xml
  typeAliasesPackage: com.heyuht.hospital.**.domain
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

2.4、示例代码

Domain

Account.java

package com.heyuht.hospital.domain;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.util.Date;

/**
 * 用户
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("account")
public class Account implements Serializable {

    private static final long serialVersionUID = 1L;

    @TableId(value = "id", type = IdType.AUTO)
    private Long id;

    /**
     * 余额
     */
    private Double balance;

    private Date lastUpdateTime;
}

Order.java

package com.heyuht.hospital.domain;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/**
 * 订单
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("p_order")
public class Order implements Serializable {

    private static final long serialVersionUID = 1L;

    @TableId(value = "id", type = IdType.AUTO)
    private Integer id;
    /**
     * 用户ID
     */
    private Long userId;

    /**
     * 商品ID
     */
    private Long productId;

    /**
     * 订单状态
     */
    private int status;

    /**
     * 数量
     */
    private Integer amount;

    /**
     * 总金额
     */
    private Double totalPrice;

    public Order(Long userId, Long productId, int status, Integer amount)
    {
        this.userId = userId;
        this.productId = productId;
        this.status = status;
        this.amount = amount;
    }
}

Product.java

package com.heyuht.hospital.domain;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.util.Date;

/**
 * 产品
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("product")
public class Product implements Serializable {

    private static final long serialVersionUID = 1L;

    @TableId(value = "id", type = IdType.AUTO)
    private Integer id;
    /**
     * 价格
     */
    private Double price;
    /**
     * 库存
     */
    private Integer stock;

    private Date lastUpdateTime;
}

Dto

PlaceOrderRequest.java

package com.heyuht.hospital.domain.dto;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class PlaceOrderRequest {
    private Long userId;

    private Long productId;

    private Integer amount;
}

ReduceBalanceRequest.java

package com.heyuht.hospital.domain.dto;

import lombok.Data;

@Data
public class ReduceBalanceRequest {
    private Long userId;

    private Integer price;
}

ReduceStockRequest.java

package com.heyuht.hospital.domain.dto;

import lombok.Data;

@Data
public class ReduceStockRequest {
    private Long productId;

    private Integer amount;
}

Mapper

AccountMapper.java

package com.heyuht.hospital.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heyuht.hospital.domain.Account;

/**
 * <p>
 * 用户表 Mapper 接口
 * </p>
 *
 * @author galen
 * @since 2021-07-23
 */
public interface AccountMapper extends BaseMapper<Account> {

}

OrderMapper.java

package com.heyuht.hospital.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heyuht.hospital.domain.Order;

public interface OrderMapper  extends BaseMapper<Order> {
}

ProductMapper.java

package com.heyuht.hospital.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heyuht.hospital.domain.Product;

public interface ProductMapper extends BaseMapper<Product> {

}

Service

AccountService.java

package com.heyuht.hospital.service;

public interface AccountService {
    /**
     * 账户扣减
     *
     * @param userId 用户 ID
     * @param price  扣减金额
     */
    void reduceBalance(Long userId, Double price);
}

OrderService.java

package com.heyuht.hospital.service;

import com.heyuht.hospital.domain.dto.PlaceOrderRequest;

public interface OrderService {
    /**
     * 下单
     *
     * @param placeOrderRequest 订单请求参数
     */
    void placeOrder(PlaceOrderRequest placeOrderRequest);
}

ProductService.java

package com.heyuht.hospital.service;

public interface ProductService {
    /**
     * 扣减库存
     *
     * @param productId 商品 ID
     * @param amount    扣减数量
     * @return 商品总价
     */
    Double reduceStock(Long productId, Integer amount);
}

ServiceImpl

AccountServiceImpl.java

package com.heyuht.hospital.service.impl;


import com.baomidou.dynamic.datasource.annotation.DS;
import com.heyuht.hospital.domain.Account;
import com.heyuht.hospital.mapper.AccountMapper;
import com.heyuht.hospital.service.AccountService;
import io.seata.core.context.RootContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;

@Service
public class AccountServiceImpl implements AccountService {
    private static final Logger log = LoggerFactory.getLogger(AccountServiceImpl.class);

    @Resource
    private AccountMapper accountMapper;

    /**
     * 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW
     */
    @DS("account")
    @Override
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void reduceBalance(Long userId, Double price) {
        log.info("=============ACCOUNT START=================");
        log.info("当前 XID: {}", RootContext.getXID());

        Account account = accountMapper.selectById(userId);
        Double balance = account.getBalance();
        log.info("下单用户{}余额为 {},商品总价为{}", userId, balance, price);

        if (balance < price) {
            log.warn("用户 {} 余额不足,当前余额:{}", userId, balance);
            throw new RuntimeException("余额不足");
        }
        log.info("开始扣减用户 {} 余额", userId);
        double currentBalance = account.getBalance() - price;
        account.setBalance(currentBalance);
        accountMapper.updateById(account);
        log.info("扣减用户 {} 余额成功,扣减后用户账户余额为{}", userId, currentBalance);
        log.info("=============ACCOUNT END=================");
    }

}

OrderServiceImpl.java

package com.heyuht.hospital.service.impl;

import com.baomidou.dynamic.datasource.annotation.DS;
import com.heyuht.hospital.domain.Order;
import com.heyuht.hospital.domain.dto.PlaceOrderRequest;
import com.heyuht.hospital.mapper.OrderMapper;
import com.heyuht.hospital.service.AccountService;
import com.heyuht.hospital.service.OrderService;
import com.heyuht.hospital.service.ProductService;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;

@Service
public class OrderServiceImpl implements OrderService {
    private static final Logger log = LoggerFactory.getLogger(OrderServiceImpl.class);

    @Resource
    private OrderMapper orderMapper;

    @Autowired
    private AccountService accountService;

    @Autowired
    private ProductService productService;


    @DS("order") // 每一层都需要使用多数据源注解切换所选择的数据库
    @Override
    @Transactional
    @GlobalTransactional // 重点 第一个开启事务的需要添加seata全局事务注解
    public void placeOrder(PlaceOrderRequest request) {
        log.info("=============ORDER START=================");
        Long userId = request.getUserId();
        Long productId = request.getProductId();
        Integer amount = request.getAmount();
        log.info("收到下单请求,用户:{}, 商品:{},数量:{}", userId, productId, amount);

        log.info("当前 XID: {}", RootContext.getXID());

        Order order = new Order(userId, productId, 0, amount);

        orderMapper.insert(order);
        log.info("订单一阶段生成,等待扣库存付款中");

        // 扣减库存并计算总价
        Double totalPrice = productService.reduceStock(productId, amount);
        // 扣减余额
        accountService.reduceBalance(userId, totalPrice);

        order.setStatus(1);
        order.setTotalPrice(totalPrice);
        orderMapper.updateById(order);
        log.info("订单已成功下单");
        log.info("=============ORDER END=================");
    }

}

ProductServiceImpl.java

package com.heyuht.hospital.service.impl;

import com.baomidou.dynamic.datasource.annotation.DS;
import com.heyuht.hospital.domain.Product;
import com.heyuht.hospital.mapper.ProductMapper;
import com.heyuht.hospital.service.ProductService;
import io.seata.core.context.RootContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;

@Service
public class ProductServiceImpl implements ProductService {
    private static final Logger log = LoggerFactory.getLogger(ProductServiceImpl.class);

    @Resource
    private ProductMapper productMapper;

    /**
     * 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW
     *
     * @return
     */
    @DS("product")
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    @Override
    public Double reduceStock(Long productId, Integer amount) {
        log.info("=============PRODUCT START=================");
        log.info("当前 XID: {}", RootContext.getXID());

        // 检查库存
        Product product = productMapper.selectById(productId);
        Integer stock = product.getStock();
        log.info("商品编号为 {} 的库存为{},订单商品数量为{}", productId, stock, amount);

        if (stock < amount) {
            log.warn("商品编号为{} 库存不足,当前库存:{}", productId, stock);
            throw new RuntimeException("库存不足");
        }
        log.info("开始扣减商品编号为 {} 库存,单价商品价格为{}", productId, product.getPrice());
        // 扣减库存
        int currentStock = stock - amount;
        product.setStock(currentStock);
        productMapper.updateById(product);
        double totalPrice = product.getPrice() * amount;
        log.info("扣减商品编号为 {} 库存成功,扣减后库存为{}, {} 件商品总价为 {} ", productId, currentStock, amount, totalPrice);
        log.info("=============PRODUCT END=================");
        return totalPrice;
    }

}

Controller

OrderController.java

package com.heyuht.hospital.controller;

import com.heyuht.hospital.service.OrderService;
import com.heyuht.hospital.domain.dto.PlaceOrderRequest;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/order")
public class OrderController {
    @Autowired
    private OrderService orderService;

    @PostMapping("/placeOrder")
    public String placeOrder(@Validated @RequestBody PlaceOrderRequest request) {
        orderService.placeOrder(request);
        return "下单成功";
    }

    @PostMapping("/test1")
    @ApiOperation("测试商品库存不足-异常回滚")
    public String test1() {
        // 商品单价10元,库存20个,用户余额50元,模拟一次性购买22个。 期望异常回滚
        orderService.placeOrder(new PlaceOrderRequest(1L, 1L, 22));
        return "下单成功";
    }

    @PostMapping("/test2")
    @ApiOperation("测试用户账户余额不足-异常回滚")
    public String test2() {
        // 商品单价10元,库存20个,用户余额50元,模拟一次性购买6个。 期望异常回滚
        orderService.placeOrder(new PlaceOrderRequest(1L, 1L, 6));
        return "下单成功";
    }
}

3、seata序列化日期类型出错

如果业务表中存在 datetime 的数据类型,那么在分布式事务中,修改这个字段的值时,会出现如下错误。此处提供2种解决方案。

17:10:46.037 [http-nio-9204-exec-1] ERROR i.s.r.d.u.p.JacksonUndoLogParser - [encode,127] - json encode exception, Type id handling not implemented for type java.lang.Object (by serializer of type com.fasterxml.jackson.databind.ser.impl.UnsupportedTypeSerializer) (through reference chain: io.seata.rm.datasource.undo.BranchUndoLog["sqlUndoLogs"]->java.util.ArrayList[0]->io.seata.rm.datasource.undo.SQLUndoLog["afterImage"]->io.seata.rm.datasource.sql.struct.TableRecords["rows"]->java.util.ArrayList[0]->io.seata.rm.datasource.sql.struct.Row["fields"]->java.util.ArrayList[30]->io.seata.rm.datasource.sql.struct.Field["value"])
com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Type id handling not implemented for type java.lang.Object (by serializer of type com.fasterxml.jackson.databind.ser.impl.UnsupportedTypeSerializer) (through reference chain: io.seata.rm.datasource.undo.BranchUndoLog["sqlUndoLogs"]->java.util.ArrayList[0]->io.seata.rm.datasource.undo.SQLUndoLog["afterImage"]->io.seata.rm.datasource.sql.struct.TableRecords["rows"]->java.util.ArrayList[0]->io.seata.rm.datasource.sql.struct.Row["fields"]->java.util.ArrayList[30]->io.seata.rm.datasource.sql.struct.Field["value"])
	at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
	at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1276)
    ........

此处使用的Seata的版本是1.4.2

3.1、数据库中不使用 datetime 数据类型

经过测试,发现 datetime 类型会出现问题,那么如果我们不使用这个类型,使用timestamp类型代替,发现就可以解决问题。

3.2、修改undo_log默认的序列化方式

undo_log 默认的序列化方式 jackson,在seata中使用jackson序列化方式会有问题,那么我们修改 kryo 序列化方式就可以解决这个问题。

1、依赖文件中引入 kryo

ruoyi-cloud项目放到ruoyi-common-core/pom.xml下即可

<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>kryo</artifactId>
    <version>4.0.2</version>
</dependency>
<dependency>
    <groupId>de.javakaffee</groupId>
    <artifactId>kryo-serializers</artifactId>
    <version>0.42</version>
</dependency>

2、修改seata的换序列化方式 配置中心中配置client.undo.logSerialization=kryo

image

4、测试验证

使用Postman工具测试接口,注意观察运行日志,至此分布式事务集成案例全流程完毕。

4.1、正常下单

模拟正常下单,买一个商品 http://localhost:9201/order/placeOrder

Content-Type/application/json

{
    "userId": 1,
    "productId": 1,
    "amount": 1
}

4.2、库存不足

模拟库存不足,事务回滚 http://localhost:9201/order/placeOrder

Content-Type/application/json

{
    "userId": 1,
    "productId": 1,
    "amount": 22
}

4.3、用户余额不足

模拟用户余额不足,事务回滚 http://localhost:9201/order/placeOrder

Content-Type/application/json

{
    "userId": 1,
    "productId": 1,
    "amount": 6
}

5、集成Nacos配置中心

1、解压seata-server-$version.zip后修改conf/registry.conf文件:

registry {
  type = "nacos"
  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = ""
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
}

config {
  type = "nacos"
  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = "nacos"
    password = "nacos"
  }
}

由于使用nacos作为注册中心,所以conf目录下的file.conf无需理会。然后就可以直接启动bin/seata-server.bat,可以在nacos里看到一个名为seata-server的服务了。 seata

2、由于seata使用mysql作为db高可用数据库,故需要在mysql创建一个ry-seata库,并导入数据库脚本。

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(96),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
    `branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';

3、config.txt文件复制到seata目录

config.txt

service.vgroupMapping.ruoyi-system-group=default
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/ry-seata?useUnicode=true
store.db.user=root
store.db.password=password
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000

4、nacos-config.sh复制到seata的conf目录

nacos-config.sh

#!/usr/bin/env bash
# Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at、
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

while getopts ":h:p:g:t:u:w:" opt
do
  case $opt in
  h)
    host=$OPTARG
    ;;
  p)
    port=$OPTARG
    ;;
  g)
    group=$OPTARG
    ;;
  t)
    tenant=$OPTARG
    ;;
  u)
    username=$OPTARG
    ;;
  w)
    password=$OPTARG
    ;;
  ?)
    echo " USAGE OPTION: $0 [-h host] [-p port] [-g group] [-t tenant] [-u username] [-w password] "
    exit 1
    ;;
  esac
done

urlencode() {
  for ((i=0; i < ${#1}; i++))
  do
    char="${1:$i:1}"
    case $char in
    [a-zA-Z0-9.~_-]) printf $char ;;
    *) printf '%%%02X' "'$char" ;;
    esac
  done
}

if [[ -z ${host} ]]; then
    host=localhost
fi
if [[ -z ${port} ]]; then
    port=8848
fi
if [[ -z ${group} ]]; then
    group="SEATA_GROUP"
fi
if [[ -z ${tenant} ]]; then
    tenant=""
fi
if [[ -z ${username} ]]; then
    username=""
fi
if [[ -z ${password} ]]; then
    password=""
fi

nacosAddr=$host:$port
contentType="content-type:application/json;charset=UTF-8"

echo "set nacosAddr=$nacosAddr"
echo "set group=$group"

failCount=0
tempLog=$(mktemp -u)
function addConfig() {
  curl -X POST -H "${contentType}" "http://$nacosAddr/nacos/v1/cs/configs?dataId=$(urlencode $1)&group=$group&content=$(urlencode $2)&tenant=$tenant&username=$username&password=$password" >"${tempLog}" 2>/dev/null
  if [[ -z $(cat "${tempLog}") ]]; then
    echo " Please check the cluster status. "
    exit 1
  fi
  if [[ $(cat "${tempLog}") =~ "true" ]]; then
    echo "Set $1=$2 successfully "
  else
    echo "Set $1=$2 failure "
    (( failCount++ ))
  fi
}

count=0
for line in $(cat $(dirname "$PWD")/config.txt | sed s/[[:space:]]//g); do
  (( count++ ))
	key=${line%%=*}
    value=${line#*=}
	addConfig "${key}" "${value}"
done

echo "========================================================================="
echo " Complete initialization parameters,  total-count:$count ,  failure-count:$failCount "
echo "========================================================================="

if [[ ${failCount} -eq 0 ]]; then
	echo " Init nacos config finished, please start seata-server. "
else
	echo " init nacos config fail. "
fi

5、执行命令,后面填写nacos的IP地址,我的是本机所以是127.0.0.1

sh nacos-config.sh 127.0.0.1

成功后nacos配置列表也能查询到相关配置 nacos

6、修改服务配置文件

# spring配置
spring: 
  datasource:
    dynamic:
      # 开启seata代理
      seata: true
	  
# seata配置
seata:
  enabled: true
  # Seata 应用编号,默认为 ${spring.application.name}
  application-id: ${spring.application.name}
  # Seata 事务组编号,用于 TC 集群名
  tx-service-group: ${spring.application.name}-group
  # 关闭自动代理
  enable-auto-data-source-proxy: false
  # 服务配置项
  service:
    # 虚拟组和分组的映射
    vgroup-mapping:
      ruoyi-system-group: default
  config:
    type: nacos
    nacos:
      serverAddr: 127.0.0.1:8848
      group: SEATA_GROUP
      namespace:
  registry:
    type: nacos
    nacos:
      application: seata-server
      server-addr: 127.0.0.1:8848
      namespace:

6、测试Feign服务调用

测试使用ruoyi-file添加Feign调用测试文件入库,验证分布式数据库调用执行结果,也适用于新的应用。

image

标签:seata,spring,import,NULL,com,id,cloud,log
来源: https://www.cnblogs.com/galenblog/p/16277392.html