Fescar的AT模式演示
作者:互联网
准备工作
首先导入模块day12的changgou_common_fescar
目录结构如下
里面重要的是一个配置类,一过滤器,一个拦截器
配置类FescarAutoConfiguration如下
package com.changgou.fescar.config; import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.fescar.rm.datasource.DataSourceProxy; import com.alibaba.fescar.spring.annotation.GlobalTransactionScanner; import com.changgou.fescar.filter.FescarRMRequestFilter; import com.changgou.fescar.interceptor.FescarRestInterceptor; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; import org.springframework.http.client.ClientHttpRequestInterceptor; import org.springframework.web.client.RestTemplate; import javax.sql.DataSource; import java.sql.DriverManager; import java.sql.SQLException; import java.util.Collection; import java.util.List; /** * * 创建数据源 * * 定义全局事务管理器扫描对象 * * 给所有RestTemplate添加头信息防止微服务之间调用问题 */ @Configuration public class FescarAutoConfiguration { public static final String FESCAR_XID = "fescarXID"; /*** * 创建代理数据库 * @param environment * @return */ @Bean public DataSource dataSource(Environment environment){ DruidDataSource dataSource = new DruidDataSource(); dataSource.setUrl(environment.getProperty("spring.datasource.url")); try { dataSource.setDriver(DriverManager.getDriver(environment.getProperty("spring.datasource.url"))); } catch (SQLException e) { throw new RuntimeException("can't recognize dataSource Driver"); } dataSource.setUsername(environment.getProperty("spring.datasource.username")); dataSource.setPassword(environment.getProperty("spring.datasource.password")); return new DataSourceProxy(dataSource); } /*** * 全局事务扫描器 * 用来解析带有@GlobalTransactional注解的方法,然后采用AOP的机制控制事务 * @param environment * @return */ @Bean public GlobalTransactionScanner globalTransactionScanner(Environment environment){ String applicationName = environment.getProperty("spring.application.name"); String groupName = environment.getProperty("fescar.group.name"); if(applicationName == null){ return new GlobalTransactionScanner(groupName == null ? "my_test_tx_group" : groupName); }else{ return new GlobalTransactionScanner(applicationName, groupName == null ? "my_test_tx_group" : groupName); } } /*** * 每次微服务和微服务之间相互调用 * 要想控制全局事务,每次TM都会请求TC生成一个XID,每次执行下一个事务,也就是调用其他微服务的时候都需要将该XID传递过去 * 所以我们可以每次请求的时候,都获取头中的XID,并将XID传递到下一个微服务 * @param restTemplates * @return */ @ConditionalOnBean({RestTemplate.class}) @Bean public Object addFescarInterceptor(Collection<RestTemplate> restTemplates){ restTemplates.stream() .forEach(restTemplate -> { List<ClientHttpRequestInterceptor> interceptors = restTemplate.getInterceptors(); if(interceptors != null){ interceptors.add(fescarRestInterceptor()); } }); return new Object(); } @Bean public FescarRMRequestFilter fescarRMRequestFilter(){ return new FescarRMRequestFilter(); } @Bean public FescarRestInterceptor fescarRestInterceptor(){ return new FescarRestInterceptor(); } }
过滤器FescarRMRequestFilter如下 作用为给线程加XID
package com.changgou.fescar.filter;
import com.alibaba.fescar.core.context.RootContext;
import com.changgou.fescar.config.FescarAutoConfiguration;
import org.slf4j.Logger;
import org.springframework.util.StringUtils;
import org.springframework.web.filter.OncePerRequestFilter;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
/**
*
*/
public class FescarRMRequestFilter extends OncePerRequestFilter {
private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger( FescarRMRequestFilter.class);
/**
* 给每次线程请求绑定一个XID
* @param request
* @param response
* @param filterChain
*/
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
String currentXID = request.getHeader( FescarAutoConfiguration.FESCAR_XID);
if(!StringUtils.isEmpty(currentXID)){
RootContext.bind(currentXID);
LOGGER.info("当前线程绑定的XID :" + currentXID);
}
try{
filterChain.doFilter(request, response);
} finally {
String unbindXID = RootContext.unbind();
if(unbindXID != null){
LOGGER.info("当前线程从指定XID中解绑 XID :" + unbindXID);
if(!currentXID.equals(unbindXID)){
LOGGER.info("当前线程的XID发生变更");
}
}
if(currentXID != null){
LOGGER.info("当前线程的XID发生变更");
}
}
}
}
拦截器FescarRestInterceptor内容为 作用为在微服务间转发XID 注意这个拦截器拦的是Feign请求,所以远程调用要用feign
package com.changgou.fescar.interceptor; import com.alibaba.fescar.core.context.RootContext; import com.changgou.fescar.config.FescarAutoConfiguration; import feign.RequestInterceptor; import feign.RequestTemplate; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpRequest; import org.springframework.http.client.ClientHttpRequestExecution; import org.springframework.http.client.ClientHttpRequestInterceptor; import org.springframework.http.client.ClientHttpResponse; import org.springframework.util.StringUtils; import java.io.IOException; import java.util.Collections; /** * */ public class FescarRestInterceptor implements RequestInterceptor, ClientHttpRequestInterceptor { @Override public void apply(RequestTemplate requestTemplate) { String xid = RootContext.getXID(); if(!StringUtils.isEmpty(xid)){ requestTemplate.header( FescarAutoConfiguration.FESCAR_XID, xid); } } @Override public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException { String xid = RootContext.getXID(); if(!StringUtils.isEmpty(xid)){ HttpHeaders headers = request.getHeaders(); headers.put( FescarAutoConfiguration.FESCAR_XID, Collections.singletonList(xid)); } return execution.execute(request, body); } }
配置文件spring.factories如下
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.changgou.fescar.config.FescarAutoConfiguration
配置文件file.conf如下
transport { # tcp udt unix-domain-socket type = "TCP" #NIO NATIVE server = "NIO" #enable heartbeat heartbeat = true #thread factory for netty thread-factory { boss-thread-prefix = "NettyBoss" worker-thread-prefix = "NettyServerNIOWorker" server-executor-thread-prefix = "NettyServerBizHandler" share-boss-worker = false client-selector-thread-prefix = "NettyClientSelector" client-selector-thread-size = 1 client-worker-thread-prefix = "NettyClientWorkerThread" # netty boss thread size,will not be used for UDT boss-thread-size = 1 #auto default pin or 8 worker-thread-size = 8 } } service { #vgroup->rgroup vgroup_mapping.my_test_tx_group = "default" #only support single node default.grouplist = "127.0.0.1:8091" #degrade current not support enableDegrade = false #disable disable = false disableGlobalTransaction = false } client { async.commit.buffer.limit = 10000 lock { retry.internal = 10 retry.times = 30 } }
配置文件registry.conf如下
registry { # file 、nacos 、eureka、redis、zk type = "file" nacos { serverAddr = "localhost" namespace = "public" cluster = "default" } eureka { serviceUrl = "http://localhost:1001/eureka" application = "default" weight = "1" } redis { serverAddr = "localhost:6381" db = "0" } zk { cluster = "default" serverAddr = "127.0.0.1:2181" session.timeout = 6000 connect.timeout = 2000 } file { name = "file.conf" } } config { # file、nacos 、apollo、zk type = "file" nacos { serverAddr = "localhost" namespace = "public" cluster = "default" } apollo { app.id = "fescar-server" apollo.meta = "http://192.168.1.204:8801" } zk { serverAddr = "127.0.0.1:2181" session.timeout = 6000 connect.timeout = 2000 } file { name = "file.conf" } }
pom文件如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>changgou_parent</artifactId> <groupId>com.changgou</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>changgou_common_fescar</artifactId> <properties> <fescar.version>0.4.2</fescar.version> </properties> <dependencies> <dependency> <groupId>com.changgou</groupId> <artifactId>changgou_common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>com.alibaba.fescar</groupId> <artifactId>fescar-tm</artifactId> <version>${fescar.version}</version> </dependency> <dependency> <groupId>com.alibaba.fescar</groupId> <artifactId>fescar-spring</artifactId> <version>${fescar.version}</version> </dependency> </dependencies> </project>
启动fescar服务
day12的资源里有一个fescar-server-0.4.2,这是fescar的服务器
把服务器复制到changgou_parent的平级目录,打开bin,执行.bat文件,注意不能在别处启动
mysql中已经在changgou_order中造好了undo_log表
使用fescar的AT模式
我们有两个模块,订单模块和货物模块,当订单中生成新订单时,会用Feign远程调用货物模块减少库存
现在我们故意让订单代码出错,错误在调用减少货物之后
如果用@Transactional注解,数据空中不会新增订单,但会减少库存,因为本地事务只能管理本模块
在两个模块中引入准备中的fescar模块的坐标
<!--fescar依赖--> <dependency> <groupId>com.changgou</groupId> <artifactId>changgou_common_fescar</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
在订单模块的添加订单方法上加一个注解
@GlobalTransactional(name = "order_add")
注意只有这个一个注解,重启订单模块和货物模块,注意都要重启,即使货物模块没有改动
重启时注意准备阶段打开的fescar客户端,发现两个模块都注册到了fescar
此时执行,依然出错,但没有新增订单也没有减少库存
1
1
1
标签:XID,演示,fescar,模式,Fescar,org,import,com,changgou 来源: https://www.cnblogs.com/wrc-blog/p/14308510.html