其他分享
首页 > 其他分享> > Springboot+Cloud+Nacos+Seata整合分布式事务

Springboot+Cloud+Nacos+Seata整合分布式事务

作者:互联网

一.项目架构

Springboot: 2.3.3.RELEASE

Springcloud: Hoxton.SR7

JDK: 11

Seata: 1.4.0

Mysql: 8.0.21

Nacos: 1.3.2

这里只介绍Seata的安装及配置步骤,其他的服务如Mysql、Nacos等略过,大家可以自行安装

二.安装Seata

1.下载

seata: https://github.com/seata/seata/releases

根据版本下载服务端,项目使用的是1.4.0版本

下载完成后解压文件结构如下

bin:启动命令

conf:配置文件

lib:依赖包等

 

2.配置

进入到conf文件夹

这里一般我们需要修改的文件有两个:

file.conf 

registry.conf

file.conf

项目里需要修改的地方是数据源的配置,store数据仓库配置的方式有三种 file db redis,项目里使用的是mysql,所以设置mode="db",这里需要注意的是不同的Mysql版本需要配置对应的驱动

因为项目用的是Mysql是8.0.21,所以驱动设置的是 driverClassName = "com.mysql.cj.jdbc.Driver"

数据库配置用的druid,所以设置datasource = "druid"

然后就可以设置数据库的账号、密码、使用的数据库等

到这里file.conf就配置好了,其他的配置大家可以自己根据需要再进行配置

registry.conf

项目里seata服务端的配置文件读的本地,所以只需要更改registry的配置

registry支持的类型比较多,大家可以根据实际情况来选择,项目里因为整合了Nacos,所以这里我们设置 type="nacos"

nacos的配置

application:服务名称

sercerAddr:Nacos的ip+port,比如:127.0.0.1:8848

group:Nacos分组,比如:DEFAULT_GROUP

namespace:Nacos空间别名,根据实际情况配置

3.初始化数据库脚本

脚本下载:https://github.com/seata/seata/tree/develop/script/server/db

这里选择对应的数据库脚本,下载完成后在配置的数据里执行即可(项目里是单独给seata配置了一个数据源)

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(96),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

执行完成后,会初始化下面三张表给服务端使用

4.启动Seata服务端

通过上面的配置,我们就可以进入刚才解压好的seata文件的bin目录中

window 启动脚本(双击即可)->seata-server.bat

linux 启动脚本->seata-server.sh

以window为例,双击seata-server.bat后弹出命令框,提示

Server started, listen port: 8091表示本地启动成功

查看Nacos服务列表,在对应的命名空间下看到有对应的Seata服务端已经注册到Nacos上了,到此Seata服务端启动完成

 

三、服务端配置

1.项目中模拟分布式事务的业务场景

测试中涉及到四个服务 manage member order product

业务流(这里只是简单模拟,不是真实业务场景):

manage提供测试入口,依次调用product商品服务扣减库存,member会员服务扣减积分,order订单服务创建订单,模拟异常查看测试结果

2.配置说明

a.manage member order product 的配置文件中需要指定

seata.tx-service-group
seata.tx-service-group=my_test_tx_group(默认的)

注意这里的 my_test_tx_group 必须要服务端的一致,如果需要改这个配置,服务端也必须要改

b.pom.xml

manage member order product 的pom文件分别需要导入seata的依赖jar(项目使用的Maven)

spring-cloud-starter-alibaba-seata
seata-spring-boot-starter
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <version>2.2.3.RELEASE</version>
            <exclusions>
                <exclusion`>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-all</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-spring-boot-starter</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <version>1.4.0</version>
        </dependency>

刚开始引入的时候,项目里报cglib依赖冲突的错误,最后查询资料上说是因为项目里整合mybatis,mybatis的cglib需要2.x.x的版本,否则就会冲突

最后修改为如下红色所示,如果没有出现依赖冲突,按照上面的配置即可,如果出现cglib冲突,可以参考下面的配置

c.数据库初始化

下载地址: https://github.com/seata/seata/blob/develop/script/client/at/db/mysql.sql

-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
    `branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

因为项目是分库的,所以需要在每个微服务对应的数据源中初始化一个自己的undo_log表

 

结合项目的业务场景需要初始化三张简单的业务表

member_test: 会员表->用于扣减积分

order_test: 订单表->用于创建订单

product_test: 商品表->用户扣减库存

 

CREATE TABLE `member_test` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `member_id` int NOT NULL DEFAULT '0' COMMENT '会员id',
  `integral` int NOT NULL DEFAULT '0' COMMENT '积分',
  PRIMARY KEY (`id`),
  UNIQUE KEY `un_member_id` (`member_id`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8;


CREATE TABLE `order_test` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `order_no` varchar(50) NOT NULL COMMENT '订单编号',
  `member_id` int NOT NULL DEFAULT '0' COMMENT '会员id',
  `product_id` int NOT NULL COMMENT '商品id',
  `quantity` int NOT NULL DEFAULT '0' COMMENT '数量',
  `amount` decimal(12,2) NOT NULL DEFAULT '0.00' COMMENT '金额',
  PRIMARY KEY (`id`),
  UNIQUE KEY `un_order_no` (`order_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `product_test` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `product_name` varchar(50) NOT NULL DEFAULT '' COMMENT '名称',
  `inventory` int NOT NULL DEFAULT '0' COMMENT '库存',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8;

4.项目部署

manage: 统一服务管理,测试入口

Controller

package intelligence.core.manage.controller.test;

import intelligence.core.dependent.dto.test.SeataTestDto;
import intelligence.core.manage.service.test.TransactionService;
import intelligence.library.common.annotation.AuthIgnore;
import intelligence.library.common.vo.base.ResponseVo;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author intelligence
 */
@Api(tags = {"事务测试管理"})
@RestController
@RequestMapping("/api/${api.name}/seata")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class TransactionController {
    private final TransactionService transactionService;

    @ApiOperation("测试")
    @PostMapping("test")
    @AuthIgnore
    public ResponseVo test(@RequestBody SeataTestDto dto) {
        return transactionService.test(dto);
    }
}

Dto

package intelligence.core.dependent.dto.test;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author intelligence
 */
@Data
@ApiModel
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SeataTestDto {
    @ApiModelProperty("会员id")
    private int memberId;
    @ApiModelProperty("商品id")
    private int productId;
    @ApiModelProperty("商品数量")
    private int quantity;
}

Service

这里的一个关键注解用于开启全局事务

@GlobalTransactional 用户统一入口的全局事务管理,项目加在manage上

@Transactional 各个微服务方法上添加此注解用于开启本地事务,项目里加在 meber product order三个服务对应的方法上

这里分别注入 member product order的client用于服务间的调用(项目整合了Feign用来实现微服务调用)

package intelligence.core.manage.service.test;

import intelligence.core.dependent.client.member.MemberTestClient;
import intelligence.core.dependent.client.order.OrderTestClient;
import intelligence.core.dependent.client.product.ProductTestClient;
import intelligence.core.dependent.dto.test.SeataTestDto;
import intelligence.library.common.dto.exception.CommonException;
import intelligence.library.common.vo.base.ResponseVo;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author intelligence
 */
@Service
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class TransactionService {
    private final OrderTestClient orderTestClient;
    private final ProductTestClient productTestClient;
    private final MemberTestClient memberTestClient;

    @GlobalTransactional(rollbackFor = Exception.class)
    public ResponseVo test(SeataTestDto dto) {
        //扣减库存
        ResponseVo productVo = productTestClient.test(dto);
        if (!productVo.ok()) {
            throw CommonException.from("扣减库存失败");
        }
        //扣减积分
        ResponseVo memberVo = memberTestClient.test(dto);
        if (!memberVo.ok()) {
            throw CommonException.from("扣减积分失败");
        }
        //创建订单
        ResponseVo orderVo = orderTestClient.test(dto);
        if (!orderVo.ok()) {
            throw CommonException.from("创建订单失败");
        }
        //模拟异常
        throw CommonException.from("创建订单失败");
    }
}

member: 会员服务

package intelligence.core.member.service;

import intelligence.core.dependent.dto.test.SeataTestDto;
import intelligence.core.dependent.entity.member.MemberTest;
import intelligence.core.member.mapper.MemberTestMapper;
import intelligence.library.common.dto.exception.CommonException;
import intelligence.library.common.utils.base.ResponseUtils;
import intelligence.library.common.vo.base.ResponseVo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

/**
 * @author intelligence
 */
@Service
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class MemberTestService {
    private final MemberTestMapper memberTestMapper;

    @Transactional(rollbackFor = Exception.class)
    public ResponseVo test(SeataTestDto dto) {
        MemberTest memberTest = memberTestMapper.findByMemberId(dto.getMemberId());
        if (null == memberTest) {
            throw CommonException.from("会员积分记录不存在");
        }
        if (dto.getQuantity() * 10 > memberTest.getIntegral()) {
            throw CommonException.from("会员积分不足");
        }
        memberTest.setIntegral(memberTest.getIntegral() - dto.getQuantity() * 10);
        memberTestMapper.update(memberTest);
        return ResponseUtils.success();
    }
}

order: 订单服务

package intelligence.core.order.service.service;

import intelligence.core.dependent.dto.test.SeataTestDto;
import intelligence.core.dependent.entity.order.OrderTest;
import intelligence.core.order.mapper.order.OrderTestMapper;
import intelligence.library.common.utils.base.RandomUtils;
import intelligence.library.common.utils.base.ResponseUtils;
import intelligence.library.common.vo.base.ResponseVo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.math.BigDecimal;

/**
 * @author intelligence
 */
@Service
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class OrderTestService {
    @Value("${order.prefix}")
    private String orderPreFix;

    private final OrderTestMapper orderTestMapper;

    @Transactional(rollbackFor = Exception.class)
    public ResponseVo test(SeataTestDto dto) {
        OrderTest orderTest = OrderTest.builder()
                .amount(BigDecimal.ONE)
                .memberId(dto.getMemberId())
                .orderNo(orderPreFix + RandomUtils.getOrderCode(16))
                .productId(dto.getProductId())
                .quantity(dto.getQuantity())
                .build();
        orderTestMapper.insert(orderTest);
        return ResponseUtils.success();
    }
}

product: 商品服务

package intelligence.core.product.service.product;

import intelligence.core.dependent.dto.test.SeataTestDto;
import intelligence.core.dependent.entity.product.ProductTest;
import intelligence.core.product.mapper.product.ProductTestMapper;
import intelligence.library.common.dto.exception.CommonException;
import intelligence.library.common.utils.base.ResponseUtils;
import intelligence.library.common.vo.base.ResponseVo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

/**
 * @author intelligence
 */
@Service
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ProductTestService {
    private final ProductTestMapper productTestMapper;

    @Transactional(rollbackFor = Exception.class)
    public ResponseVo test(SeataTestDto dto) {
        ProductTest productTest = productTestMapper.findById(dto.getProductId());
        if (null == productTest) {
            throw CommonException.from("商品信息不存在");
        }
        if (dto.getQuantity() > productTest.getInventory()) {
            throw CommonException.from("商品可用库存不足");
        }
        productTest.setInventory(productTest.getInventory() - dto.getQuantity());
        productTestMapper.update(productTest);
        return ResponseUtils.success();
    }
}

启动manage product member order服务

transactionServiceGroup='my_test_tx_group' 为我们刚配置的事务组,看到如下内容表示事务注册成功

查看Nacos服务列表,包含Seata的服务端,所有测试的服务已经全部注册好了

五、测试

模拟场景: id=1的会员 购买1个id=1的商品

这里只是简单的模拟一个异常场景下的全局事务回滚

会员1有10积分

商品1有一个库存

订单数据为空

 

发送请求

1.这时Seata数据源中的global_table中有一条全局事务的记录,

2.当库存扣减成功后(进入到扣减积分逻辑),查看商品库存表的库存已经变成了0,商品数据源中的undo_log中也有一条分支事务的记录

3.积分扣减成功后(进入到创建订单逻辑),查看会员的积分已经变成了0(扣了10积分),会员数据源中的undo_log中也有一条分支事务的记录

4.创建订单后,订单表中多了一条订单记录,订单数据源中的undo_log中也有一条分支事务的记录

5.模拟异常,全局事务回滚,商品库存回滚为1,会员积分回滚为10,订单数据清空,全局事务处理成功

 

 

标签:dto,Springboot,intelligence,seata,Nacos,test,import,id,Seata
来源: https://blog.csdn.net/weixin_40723662/article/details/111163507