其他分享
首页 > 其他分享> > Seata Transaction Manager

Seata Transaction Manager

作者:互联网

引言

前面,我们已经介绍了 Seata 的整体设计思想,接下来我们深入到其实现细节中,本文介绍 Seata 中 Transaction Manager 的实现。

TM

TM 和 TC 一样是一个共通的模块, 无论是 AT 模式还是 TCC 模式都需要使用 TM 模块。

首先 TM 在启动的时候会去连接 TC Server, 然后然后通过该 TM Client 与 TC 模块进行通讯。在 TM 模块中最核心的接口就是 GlobalTransaction, 里面包含了全局事务的创建, 提交, 回滚过程, 其实质就是向 TC 发送 RPC 请求。

public class DefaultGlobalTransaction implements GlobalTransaction {
    // 只保留核心内容...
    @Override
    public void begin(int timeout, String name) throws TransactionException {
        if (role != GlobalTransactionRole.Launcher) {
            check();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Begin(): just involved in global transaction [" + xid + "]");
            }
            return;
        }
        if (xid != null) {
            throw new IllegalStateException();
        }
        if (RootContext.getXID() != null) {
            throw new IllegalStateException();
        }
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;
        RootContext.bind(xid);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction [" + xid + "]");
        }

    }

    @Override
    public void commit() throws TransactionException {
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of committing
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Commit(): just involved in global transaction [" + xid + "]");
            }
            return;
        }
        if (xid == null) {
            throw new IllegalStateException();
        }

        status = transactionManager.commit(xid);
        if (RootContext.getXID() != null) {
            if (xid.equals(RootContext.getXID())) {
                RootContext.unbind();
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[" + xid + "] commit status:" + status);
        }

    }

    @Override
    public void rollback() throws TransactionException {
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of committing
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Rollback(): just involved in global transaction [" + xid + "]");
            }
            return;
        }
        if (xid == null) {
            throw new IllegalStateException();
        }

        status = transactionManager.rollback(xid);
        if (RootContext.getXID() != null) {
            if (xid.equals(RootContext.getXID())) {
                RootContext.unbind();
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[" + xid + "] rollback status:" + status);
        }
    }
}

我们可以看到, 这个接口中实际上没做什么实际的事, 它调用 transactionManager 发送消息, 然后将涉及到的全局事务 XID 保存起来, 我们看看它把数据存在什么地方了:

public class ThreadLocalContextCore implements ContextCore {

    private ThreadLocal<Map<String, String>> threadLocal = new ThreadLocal<Map<String, String>>() {
        @Override
        protected Map<String, String> initialValue() {
            return new HashMap<String, String>();
        }

    };

    @Override
    public String put(String key, String value) {
        return threadLocal.get().put(key, value);
    }

    @Override
    public String get(String key) {
        return threadLocal.get().get(key);
    }

    @Override
    public String remove(String key) {
        return threadLocal.get().remove(key);
    }
}

看上去, 它是将 XID 存在了 ThreadLocal 中, 这样在整个 RPC 调用的上下文中都能获取到 XID。接下来, 我们看看下层的 transactionManager 都做了什么:

public class DefaultTransactionManager implements TransactionManager {
    // All PRCs here
    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new TransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
        }
        return response.getXid();
    }

    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setXid(xid);
        GlobalCommitResponse response = (GlobalCommitResponse)syncCall(globalCommit);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
        globalRollback.setXid(xid);
        GlobalRollbackResponse response = (GlobalRollbackResponse)syncCall(globalRollback);
        return response.getGlobalStatus();
    }
}

可以看到, TransactionManager 才是真正做实事的, 消息的发送工作都在这里完成。好了, 至此我们知道了哪个接口管理着全局事务的记录, 哪个接口真正进行 RPC 调用, 那么谁才是这些接口的真正调用者呢? Seata 使用了模板方法模式来进行这部分工作:

public class TransactionalTemplate {

    /**
     * Execute object.
     *
     * @param business the business
     * @return the object
     * @throws TransactionalExecutor.ExecutionException the execution exception
     */
    public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. get or create a transaction
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 1.1 get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        try {

            // 2. begin transaction
            beginTransaction(txInfo, tx);

            Object rs = null;
            try {

                // Do Your Business
                rs = business.execute();

            } catch (Throwable ex) {

                // 3.the needed business exception to rollback.
                completeTransactionAfterThrowing(txInfo,tx,ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            commitTransaction(tx);

            return rs;
        } finally {
            //5. clear
            triggerAfterCompletion();
            cleanUp();
        }
    }
}

该模板的工作流程如下:

  1. 看看当前是不是已经在一个分布式事务中了, 如果是, 则复用现存的全局事务, 否则创建新的
    • 什么时候会出现已经存在全局事务的情况呢? 假设 A 调用了 B, A 创建了全局事务 GT1, B 碰巧也执行了上述的模板, 这时候 B 就不会创建新的全局事务, 而是使用 GT1, 这实际上是前面提到的事物的传播
  2. 如果是自己创建的全局事务, 则发 RPC 开始事务, 如果不是自己创建的则什么都不干
  3. 执行真正的业务逻辑
  4. 如果发生了异常, 如果自己创建全局事务, 才负责回滚, 否则就只管异常外抛
  5. 如果没发生异常, 如果自己创建全局事务, 才负责提交, 否则就什么都不做
  6. 清理工作

我们看到, 该模板实际上是业务服务的完整执行流程, 那我们每次都要自己在代码中通过该模板接口执行自己的业务代码吗? 当然不用, 实际上 Seata 基于 Spring 切面, 已经帮我们做了这些事, 我们只需要使用 GlobalTransactional 注解就够了, 接下来我们看看这部分内容:

public class GlobalTransactionalInterceptor implements MethodInterceptor {
    // 只保留核心代码
    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

        final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
        final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
        if (globalTransactionalAnnotation != null) {
            return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
        } else if (globalLockAnnotation != null) {
            return handleGlobalLock(methodInvocation);
        } else {
            return methodInvocation.proceed();
        }
    }

    private Object handleGlobalLock(final MethodInvocation methodInvocation) throws Exception {
        return globalLockTemplate.execute(() -> {
            try {
                return methodInvocation.proceed();
            } catch (Throwable e) {
                if (e instanceof Exception) {
                    throw (Exception)e;
                } else {
                    throw new RuntimeException(e);
                }
            }
        });
    }

    private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
                                           final GlobalTransactional globalTrxAnno) throws Throwable {
        try {
            return transactionalTemplate.execute(new TransactionalExecutor() {
                @Override
                public Object execute() throws Throwable {
                    return methodInvocation.proceed();
                }

                public String name() {
                    String name = globalTrxAnno.name();
                    if (!StringUtils.isNullOrEmpty(name)) {
                        return name;
                    }
                    return formatMethod(methodInvocation.getMethod());
                }

                @Override
                public TransactionInfo getTransactionInfo() {
                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
                    transactionInfo.setName(name());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
            TransactionalExecutor.Code code = e.getCode();
            switch (code) {
                case RollbackDone:
                    throw e.getOriginalException();
                case BeginFailure:
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case CommitFailure:
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackFailure:
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                default:
                    throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);

            }
        }
    }
}

我们可以看到, 上面的就是全局事务的拦截器, 它扫描 GlobalTransactionalGlobalLock, 如果是 GlobalTransactional 则用 transactionalTemplate 来执行真正的业务代码, 此外还从注解中拿出配置的回滚条件, 超时时间等配置, 给 transactionalTemplate 使用。

大家也看到了这个拦截器中, 还有一个 globalLockTemplate, 实际上这个是在 RM 中使用的, 至于为什么, 使用我们在前面的理论环节已经介绍了, 这里就不赘述了, 而且该模板代码也很简单, 就是加锁->执行->放锁, 并且这里的加锁和放锁只是改变 Context 中的标志位, 真正通过 RPC 进行锁确认的过程, 我们后面会介绍。

至此, 我们知道了 TM 是如何觉察到全局事务需求(GlobalTransactional 注解), 如何创建全局事务(RPC 调用 TC 接口), 如何通过模板方法模式完成对业务的封装(GlobalTransactionTemplate)。那么还有一个问题, 事务信息是怎么传递的呢, TM 怎么将全局事务 XID 传递给 RPC 的提供者的呢? 这部分, 根据 RPC 框架的不同, 需要不同的实现, 但是本质上都是一样的, 拦截 RPC 的调用过程, 在 RPC 请求中加一个隐藏属性来存储 XID, RPC 的提供方从请求中获取到该隐藏属性, 然后存储在事务 Context 的 ThreadLocal 中, 我们以 Dubbo 为例, 看一看它是怎么做的:

@Activate(group = {Constants.PROVIDER, Constants.CONSUMER}, order = 100)
public class TransactionPropagationFilter implements Filter {

    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String xid = RootContext.getXID();
        String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]");
        }
        boolean bind = false;
        if (xid != null) {
            // 如果当前存在 xid 说明是 RPC 发起方, 将 xid 存在 RPC context 中, 它会随着 RPC request 一起发送
            RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
        } else {
            // 否则他就是 RPC 提供方, 它从 rpc context 中拿到 xid, 并设置到事务 context 中
            if (rpcXid != null) {
                RootContext.bind(rpcXid);
                bind = true;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("bind[" + rpcXid + "] to RootContext");
                }
            }
        }
        try {
            return invoker.invoke(invocation);

        } finally {
            // 最后清除事务 context 中绑定的 xid, 防止 ThreadLocal 内容污染下次调用
            if (bind) {
                String unbindXid = RootContext.unbind();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
                }
                if (!rpcXid.equalsIgnoreCase(unbindXid)) {
                    LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
                    if (unbindXid != null) {
                        RootContext.bind(unbindXid);
                        LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
                    }
                }
            }
        }
    }
}

看过我之前发的 Dubbo系列文章 的同学, 可能还记得 Dubbo 是通过一个 Filter 的概念来表示 AOP 特性的,Filter 的注入基于 Dubbo 的 SPI, 而 Seata 这里所做的就是实现了一个 Dubbo Filter, 把事务 Context 和 RPCContext 中的数据做一下绑定。其他 RPC 框架的支持方案基本类似, 这里就不再赘述了, 值得一提的是, 如果全局事务中的各个微服务是通过 HTTP 请求来互相调用的话, 可以将 XID 存储在 HTTP Header 中, 在官方的提供的 Sample 中有一份样例代码:

// 实现 servlet filter, 这样服务提供者能从 Http request 中获取 xid 并绑定进事务 Context
@Component
public class SeataFilter implements Filter {
    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
        HttpServletRequest req = (HttpServletRequest) servletRequest;
        String xid = req.getHeader(RootContext.KEY_XID.toLowerCase());
        boolean isBind = false;
        if (StringUtils.isNotBlank(xid)) {
            RootContext.bind(xid);
            isBind = true;
        }
        try {
            filterChain.doFilter(servletRequest, servletResponse);
        } finally {
            if (isBind) {
                RootContext.unbind();
            }
        }
    }
}
// 实现 ClientHttpRequestInterceptor, 服务请求方讲 xid 塞入 http request header
public class SeataRestTemplateInterceptor implements ClientHttpRequestInterceptor {

    public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes, ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
        HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
        String xid = RootContext.getXID();
        if (StringUtils.isNotEmpty(xid)) {
            requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
        }

        return clientHttpRequestExecution.execute(requestWrapper, bytes);
    }
}
// 找到所有 RestTemplate 将 SeataRestTemplateInterceptor 注入
@Configuration
public class SeataRestTemplateAutoConfiguration {
    @Autowired(required = false)
    private Collection<RestTemplate> restTemplates;
    @Autowired
    private SeataRestTemplateInterceptor seataRestTemplateInterceptor;

    @PostConstruct
    public void init() {
        if (this.restTemplates != null) {
            Iterator var1 = this.restTemplates.iterator();
            while (var1.hasNext()) {
                RestTemplate restTemplate = (RestTemplate) var1.next();
                List<ClientHttpRequestInterceptor> interceptors = new ArrayList(restTemplate.getInterceptors());
                interceptors.add(this.seataRestTemplateInterceptor);
                restTemplate.setInterceptors(interceptors);
            }
        }
    }
}

至此, TM 的重要功能就介绍完了, 值得注意的是事务的传播过程不仅仅是 TM -> RM, RM -> RM 也会进行。接下来, 我们看一看 Seata 两种分支事务类型 AT 和 TCC 的实现方案。

参考内容

[1] fescar锁设计和隔离级别的理解
[2] 分布式事务中间件 Fescar - RM 模块源码解读
[3] Fescar分布式事务实现原理解析探秘
[4] Seata TCC 分布式事务源码分析
[5] 深度剖析一站式分布式事务方案 Seata-Server
[6] 分布式事务 Seata Saga 模式首秀以及三种模式详解
[7] 蚂蚁金服大规模分布式事务实践和开源详解
[8] 分布式事务 Seata TCC 模式深度解析
[9] Fescar (Seata)0.4.0 中文文档教程
[10] Seata Github Wiki
[11] 深度剖析一站式分布式事务方案Seata(Fescar)-Server

stun

标签:RootContext,Transaction,return,Seata,Manager,new,xid,public,String
来源: https://blog.csdn.net/BeiKeJieDeLiuLangMao/article/details/115265495