其他分享
首页 > 其他分享> > 三种分布式事务LCN、Seata、MQ

三种分布式事务LCN、Seata、MQ

作者:互联网

LCN

TxLCN分布式事务框架,

多个service模块操纵同一个数据源,

也有可能存在一个service 横跨多种数据源节点的可能

要么都成功,要么都失败。

由TxLCN解决。

常见的解决方案

 

CAP定理:一个分布式系统中Consistency(一致性) Availability(可用性) Partition tolerance(分区容错性),最多同时满足其中两个,三个不可兼得。

可用性和一致性是矛盾的,(一个节点坏了还得有别的节点来承担)。

分区容错性意思是,系统不能在一定时限内达到数据的一致性,就意味着发生了分区的情况。这时候必须要在A和C做出选择。

BASE理论:CAP理论的优化。Basically Available(基本可用 )、soft state(软状态),Eventually consistent(最终一致性)三个短语的缩写。

即使无法做到强一致,每个应用都可以根据自身业务特点,采用适当的方式达到最终一致性。

BA基本可用:指分布式系统中出现不可知故障的时候,允许损失部分可用性,但不代表整个系统不可用。(比如秒杀活动的并发降级页面)

S软状态:系统中数据允许存在中间状态(软状态),并认为这个状态不影响可用性,允许分布式节点之间存在同步延迟。(如Eureka集群同步数据)

最终一致性:允许整个系统数据在经过一定时间后,最终能达到整个系统的一致性。为弱一致性,响应给用户结果时整个系统没有达到一致性,但最终一定会达到一致性的。

强一致性:系统接受请求后,整个系统必须达到一致的结果才能响应。

LCN来源

三种事务模式

TxLCN原理

两大模块组成,TxClient TxManager;

TxClient为模块的依赖框架,提供了Tx-LCN的标准支持,事务发起方和参与方都属于TxClient

TxManager负责控制整个事务。

image-20211111113238348

123代表三个txClient将自己的事务发布到TxManager,4为发起方告诉txManager获取数据成功或者失败,如果成功(其中一个失败也算失败),TxManager返回通知AB让他们各自提交自己的事务。5为txManager通知发起方提交自己的事务。

创建事务组:事务发起方在开始执行业务代码之前先调用txManager创建事务组对象,拿到事务表示GroupId。

加入事务组:2和3,参与方将自己的事务执行情况报备给txManager

通知事务组:发起方执行完业务代码之后,将执行结果状态通知给txManager

通知事务单元:txManager询问参与方的事务执行结果

响应通知事务组:txManager通知发起方事务的最终结果

新建TxManager项目,充当管理者

执行依赖中带有的tx-manager.sql脚本文件

配置application.properties

spring.application.name=TransactionManager
server.port=7970
​
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/tx-manager?characterEncoding=UTF-8
spring.datasource.username=root
spring.datasource.password=123456
spring.redis.host=192.168.234.100
​
tx-lcn.logger.enabled=false

 

启动类加上@EnableTranactionManagerServer

遇上servertimezone问题添加serverTimezone=GMT

访问

如果打开事务:更改 tx-lcn.logger.enabled=true

依赖:

<dependencies>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.codingapi.txlcn</groupId>
            <artifactId>txlcn-tm</artifactId>
            <version>5.0.2.RELEASE</version>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

新建teacher和student student的tid依赖于teacher 只有teacher插入成功了s才能插入成功,先不要添加外键。

用两个不同的服务去添加不同的表数据

现在添加student的过程中,在添加老师数据之后出现了错误(sout(0)),那么会呈现老师的添加已经提交但是学生的添加没有提交

这时候只在学生的添加方法上@transactional是不够的,@transactional只限于本地事务。而这是分布式事务。

参与者配置

引入依赖

<dependency>
    <groupId>com.codingapi.txlcn</groupId>
    <artifactId>txlcn-tc</artifactId>
    <version>5.0.2.RELEASE</version>
</dependency>
<dependency>
    <groupId>com.codingapi.txlcn</groupId>
    <artifactId>txlcn-txmsg-netty</artifactId>
    <version>5.0.2.RELEASE</version>
</dependency>

在teacher和student的插入方法上方

@Transavtional
@LcnTransactional//(propogation为隔离级别)

在teacher和student的启动类上方加上

@EnableDistributedTransaction

在teacher和student的application.properties文件中设置地址

#配置事务管理器TxManager的事务端口
tx-lcn.client.manager-address: 127.0.0.1:8070

Tcc(非事务型数据库举例)

docker container ls -a
docker start mango

假设老师要将数据存储到mangodb

添加以下依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<!-- 数据源-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!--数据源-->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
    <version>1.2.1</version>
</dependency>
<!--连接驱动 -->
<dependency>
    <groupId>mysql</groupId></groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.49</version>
</dependency>
​

修改连接方式配置

spring.data.mongodb.host=192.168.234.100
spring.data.mongodb.authentication-database=admin
spring.data.mongodb.username=lc
spring.data.mongodb.password=123456
spring.data.mongodb.database=ego

 

修改pojo类

在插入方法上方自动装配

@MongoTemplate mongoTemplate

其插入方法去掉@Transactional

TCC模式需要开发者手动处理,所以TCC也叫代码补偿业务,mongodb没有事务,需要手动处理删除,分别由三个方法:

@Autowired
@MongoTemplate mongoTemplate
@TccTransaction
 public int insert(Teacher teacher){
     System.out.println("尝试新增数据")
    Teacher result = mongoTemplate.insert(teache);
    if(result!=null){return 1;}
    return 0;
    
 }
 public void confirmInsert(Teacher teacher){
     System.out.println("teacher="+teacher)
     //只是有确认的作用
 }
 public void cancelInsert(Teacher teacher){
     //这个teacher是带有数据的
     Criteria criteria = Criteria.where("id").is(teacher.getId());
     Query query=new Query(criteria);
     DeleteResult deleteResut=mongoTemplate.remove(query,Teacher.class);
     System.out.println("deleteResut.getDeletedCount="+deleteResut.getDeletedCount())
 }

student方不需要加@TccTransaction,仍然还是lcn(没有更改数据库)

Seata

数据存储在不同的数据服务器上。

软件:组成部分

Transaction Coordinator(TC):事务协调器,维护全局事务的运行状态,相当于Seata的server

Transaction Manager(TM):事务管理器。负责开启一个全局事务,并最终发起全局提交或全局回滚

Resource Manger(RM) 资源管理器,向事务管理器注册分支事务和并报告分支事务的状态,负责分支事务提交或回滚

事务控制流程:

  1. TM向TC申请开启全局事务(@GlobalTransaction)

  2. TC向TM返回全局事务的唯一XID

  3. 分支服务①向TC申请注册分支服务,TC向分支服务①返回BranchID

  4. 分支服务①进行操作(RM),记录操作日志undo_log,提交本地分支事务,向TC上报事务处理结果

  5. 分支服务①远程调用分支服务②

  6. 分支服务进行与①一样的操作(RM),得到唯一的BranchID,undo_log以及上报

  7. 分支服务处理完毕后,TM向TC提交全局事务决议,询问是否全部执行成功。

  8. 如果事务出现问题,反向操作undo_log回滚本地事务。

Seata流程

添加依赖:

<seata.version>1.4.2</seata.version>

<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-all</artifactId>
    <version>${seata.version}</version>
</dependency>

https://seata.io/en-us/blog/download.html 下载好对应版本的seata server

解压后,拿到\seata\conf目录下的file.conf 和 registry.conf

修改file.conf下的mod以及对应的url,driverClassName 目前mode有三个选项 file、db、redisfile.conf 文件配置

在选择 db 方式后,需要在对应数据库创建 globalTable(持久化全局事务)、branchTable(持久化各提交分支的事务)、 lockTable(持久化各分支锁定资源事务)三张表

-- the table to store GlobalSession data
-- 持久化全局事务
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
-- 持久化各提交分支的事务
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store lock data
-- 持久化每个分支锁表事务
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(96),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

registry.conf 文件设置 注册中心 和 配置中心,我选择了都放在nacos(需要额外的步骤,参见https://blog.csdn.net/qq853632587/article/details/111644295)

registry.conf 文件配置

https://link.csdn.net/?target=https%3A%2F%2Fgithub.com%2Fseata%2Fseata%2Fblob%2Fdevelop%2Fscript%2Fconfig-center%2Fnacos%2Fnacos-config.sh

下载好的nacos-config.sh放在conf目录下

https://github.com/seata/seata/blob/develop/script/config-center/config.txt

config.txt修改

config.txt就是seata各种详细的配置,执行 nacos-config.sh 即可将这些配置导入到nacos,这样就不需要将file.conf和registry.conf放到我们的项目中了,需要什么配置就直接从nacos中读取。

在conf文件git bash

sh nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t 0af6e97b-a684-4647-b696-7c6d42aecce7 -u nacos -w nacos

每个服务都搭建配置

spring:
    application:
        name: application_name
    datasource:
        driver-class-name: com.mysql.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/seata
        username: root
        password: 123456

seata:
  enabled: true
  enable-auto-data-source-proxy: true
  tx-service-group: my_test_tx_group
  registry:
    type: nacos
    nacos:
      application: seata-server
      server-addr: 127.0.0.1:8848
      username: nacos
      password: nacos
  config:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      group: SEATA_GROUP
      username: nacos
      password: nacos
      namespace: 自己的namespace
  service:
    vgroup-mapping:
      my_test_tx_group: default
    disable-global-transaction: false
  client:
    rm:
      report-success-enable: false

 

在业务逻辑层的方法(不同方法)上方增加@ GlobalTransactional

MQ事务消息

有一些第三方的MQ是支持事务消息的,如RocketMQ。但RabbitMQ和Kafka都不支持。支持事务消息的方式也是类似于二阶段提交。

举例:让支付宝账户扣除一万,我们生成一个凭证(消息),这个消息写着让余额宝账户增加一万。我们可以拿这个消息完成最终一致性。

消息解耦

  1. 支付宝在扣款事务提交之前,向实时消息服务请求发送消息,实时消息服务只记录消息数据,而不真正发送(只是知道有这一条消息),只有消息发送成功后才会提交事务

  2. 先把(支付宝-10000)封装成一个消息(new Message())),后把这个消息提交到MQ服务器上,当本地事务操作完成了以后,要么成功:(给MQ一个标识:COMMIT),要么失败:(给MQ一个标识:ROLLBACK)

send(producer.send(new Message(),callback(里面处理本地事务)))
//在callback处理本地事务:在callback方法里:
update A set amount = amount - 10000 where userId = 1;
  1. 当支付宝扣款事务被成功提交后,向实时消息服务确认发送,只有在得到确认发送指令后,实时消息服务才真正发送该消息。

  2. 当支付宝扣款事务提交失败回滚后,向实时消息服务取消发送。消息就不会被发送。

  3. 对于那些未确认的消息或者取消的消息,需要有一个消息状态确认系统定时去支付宝系统查询这个消息的状态进行更新。

    优点:消息数据独立存储,降低业务系统与消息系统之间的耦合

    缺点:一次消息发送需要两次请求;业务处理服务需要实现消息状态回查接口。

img

 

标签:事务,VARCHAR,Seata,spring,nacos,MQ,提交,LCN,id
来源: https://www.cnblogs.com/anboxu/p/15602885.html