其他分享
首页 > 其他分享> > Fescar的AT模式演示

Fescar的AT模式演示

作者:互联网

准备工作

首先导入模块day12的changgou_common_fescar

目录结构如下

里面重要的是一个配置类,一过滤器,一个拦截器

配置类FescarAutoConfiguration如下

package com.changgou.fescar.config;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fescar.rm.datasource.DataSourceProxy;
import com.alibaba.fescar.spring.annotation.GlobalTransactionScanner;
import com.changgou.fescar.filter.FescarRMRequestFilter;
import com.changgou.fescar.interceptor.FescarRestInterceptor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.web.client.RestTemplate;

import javax.sql.DataSource;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;

/**
 *  * 创建数据源
 *  * 定义全局事务管理器扫描对象
 *  * 给所有RestTemplate添加头信息防止微服务之间调用问题
 */
@Configuration
public class FescarAutoConfiguration {

    public static final String FESCAR_XID = "fescarXID";

    /***
     * 创建代理数据库
     * @param environment
     * @return
     */
    @Bean
    public DataSource dataSource(Environment environment){
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setUrl(environment.getProperty("spring.datasource.url"));
        try {
            dataSource.setDriver(DriverManager.getDriver(environment.getProperty("spring.datasource.url")));
        } catch (SQLException e) {
            throw new RuntimeException("can't recognize dataSource Driver");
        }
        dataSource.setUsername(environment.getProperty("spring.datasource.username"));
        dataSource.setPassword(environment.getProperty("spring.datasource.password"));
        return new DataSourceProxy(dataSource);
    }

    /***
     * 全局事务扫描器
     * 用来解析带有@GlobalTransactional注解的方法,然后采用AOP的机制控制事务
     * @param environment
     * @return
     */
    @Bean
    public GlobalTransactionScanner globalTransactionScanner(Environment environment){
        String applicationName = environment.getProperty("spring.application.name");
        String groupName = environment.getProperty("fescar.group.name");
        if(applicationName == null){
            return new GlobalTransactionScanner(groupName == null ? "my_test_tx_group" : groupName);
        }else{
            return new GlobalTransactionScanner(applicationName, groupName == null ? "my_test_tx_group" : groupName);
        }
    }

    /***
     * 每次微服务和微服务之间相互调用
     * 要想控制全局事务,每次TM都会请求TC生成一个XID,每次执行下一个事务,也就是调用其他微服务的时候都需要将该XID传递过去
     * 所以我们可以每次请求的时候,都获取头中的XID,并将XID传递到下一个微服务
     * @param restTemplates
     * @return
     */
    @ConditionalOnBean({RestTemplate.class})
    @Bean
    public Object addFescarInterceptor(Collection<RestTemplate> restTemplates){
        restTemplates.stream()
                .forEach(restTemplate -> {
                    List<ClientHttpRequestInterceptor> interceptors = restTemplate.getInterceptors();
                    if(interceptors != null){
                        interceptors.add(fescarRestInterceptor());
                    }
                });
        return new Object();
    }

    @Bean
    public FescarRMRequestFilter fescarRMRequestFilter(){
        return new FescarRMRequestFilter();
    }

    @Bean
    public FescarRestInterceptor fescarRestInterceptor(){
        return new FescarRestInterceptor();
    }
}

过滤器FescarRMRequestFilter如下 作用为给线程加XID


package com.changgou.fescar.filter;

import com.alibaba.fescar.core.context.RootContext;
import com.changgou.fescar.config.FescarAutoConfiguration;
import org.slf4j.Logger;
import org.springframework.util.StringUtils;
import org.springframework.web.filter.OncePerRequestFilter;

import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

/**
*
*/
public class FescarRMRequestFilter extends OncePerRequestFilter {

private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger( FescarRMRequestFilter.class);

/**
* 给每次线程请求绑定一个XID
* @param request
* @param response
* @param filterChain
*/
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
String currentXID = request.getHeader( FescarAutoConfiguration.FESCAR_XID);
if(!StringUtils.isEmpty(currentXID)){
RootContext.bind(currentXID);
LOGGER.info("当前线程绑定的XID :" + currentXID);
}
try{
filterChain.doFilter(request, response);
} finally {
String unbindXID = RootContext.unbind();
if(unbindXID != null){
LOGGER.info("当前线程从指定XID中解绑 XID :" + unbindXID);
if(!currentXID.equals(unbindXID)){
LOGGER.info("当前线程的XID发生变更");
}
}
if(currentXID != null){
LOGGER.info("当前线程的XID发生变更");
}
}
}
}

拦截器FescarRestInterceptor内容为 作用为在微服务间转发XID  注意这个拦截器拦的是Feign请求,所以远程调用要用feign

package com.changgou.fescar.interceptor;

import com.alibaba.fescar.core.context.RootContext;
import com.changgou.fescar.config.FescarAutoConfiguration;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.util.Collections;

/**
 *
 */
public class FescarRestInterceptor implements RequestInterceptor, ClientHttpRequestInterceptor {

    @Override
    public void apply(RequestTemplate requestTemplate) {
        String xid = RootContext.getXID();
        if(!StringUtils.isEmpty(xid)){
            requestTemplate.header( FescarAutoConfiguration.FESCAR_XID, xid);
        }
    }

    @Override
    public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
        String xid = RootContext.getXID();
        if(!StringUtils.isEmpty(xid)){
            HttpHeaders headers = request.getHeaders();
            headers.put( FescarAutoConfiguration.FESCAR_XID, Collections.singletonList(xid));
        }
        return execution.execute(request, body);
    }
}

配置文件spring.factories如下

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.changgou.fescar.config.FescarAutoConfiguration

 

配置文件file.conf如下

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  #thread factory for netty
  thread-factory {
    boss-thread-prefix = "NettyBoss"
    worker-thread-prefix = "NettyServerNIOWorker"
    server-executor-thread-prefix = "NettyServerBizHandler"
    share-boss-worker = false
    client-selector-thread-prefix = "NettyClientSelector"
    client-selector-thread-size = 1
    client-worker-thread-prefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    boss-thread-size = 1
    #auto default pin or 8
    worker-thread-size = 8
  }
}
service {
  #vgroup->rgroup
  vgroup_mapping.my_test_tx_group = "default"
  #only support single node
  default.grouplist = "127.0.0.1:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
  disableGlobalTransaction = false
}

client {
  async.commit.buffer.limit = 10000
  lock {
    retry.internal = 10
    retry.times = 30
  }
}

 

配置文件registry.conf如下

registry {
  # file 、nacos 、eureka、redis、zk
  type = "file"

  nacos {
    serverAddr = "localhost"
    namespace = "public"
    cluster = "default"
  }
  eureka {
    serviceUrl = "http://localhost:1001/eureka"
    application = "default"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6381"
    db = "0"
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk
  type = "file"

  nacos {
    serverAddr = "localhost"
    namespace = "public"
    cluster = "default"
  }
  apollo {
    app.id = "fescar-server"
    apollo.meta = "http://192.168.1.204:8801"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
  file {
    name = "file.conf"
  }
}

 

pom文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>changgou_parent</artifactId>
        <groupId>com.changgou</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>changgou_common_fescar</artifactId>
    <properties>
        <fescar.version>0.4.2</fescar.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.changgou</groupId>
            <artifactId>changgou_common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.fescar</groupId>
            <artifactId>fescar-tm</artifactId>
            <version>${fescar.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.fescar</groupId>
            <artifactId>fescar-spring</artifactId>
            <version>${fescar.version}</version>
        </dependency>
    </dependencies>

</project>

 

启动fescar服务

day12的资源里有一个fescar-server-0.4.2,这是fescar的服务器

把服务器复制到changgou_parent的平级目录,打开bin,执行.bat文件,注意不能在别处启动

 

mysql中已经在changgou_order中造好了undo_log表


 

使用fescar的AT模式

我们有两个模块,订单模块和货物模块,当订单中生成新订单时,会用Feign远程调用货物模块减少库存

现在我们故意让订单代码出错,错误在调用减少货物之后

如果用@Transactional注解,数据空中不会新增订单,但会减少库存,因为本地事务只能管理本模块

在两个模块中引入准备中的fescar模块的坐标

<!--fescar依赖-->
<dependency>
    <groupId>com.changgou</groupId>
    <artifactId>changgou_common_fescar</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

 

在订单模块的添加订单方法上加一个注解

@GlobalTransactional(name = "order_add")

 

注意只有这个一个注解,重启订单模块和货物模块,注意都要重启,即使货物模块没有改动

重启时注意准备阶段打开的fescar客户端,发现两个模块都注册到了fescar

此时执行,依然出错,但没有新增订单也没有减少库存


 

1

1

1

标签:XID,演示,fescar,模式,Fescar,org,import,com,changgou
来源: https://www.cnblogs.com/wrc-blog/p/14308510.html