Mybatis读写分离
作者:互联网
现成工具介绍
MyCat
活跃的、性能好的开源数据库中间件。它是一个透明化的数据库代理端,在生产环境使用需要保证服务的高可用。
ShardingSphere
ShardingSphere由 JDBC、Proxy 和 Sidecar(规划中)这 3 款既能够独立部署,又支持混合部署配合使用的产品组成。其中ShardingSphere-Proxy与MyCat是相同的定位,而ShardingSphere-JDBC在 Java 的 JDBC 层提供的额外服务。
SpringBoot集成ShardingSphere-JDBC也非常方便,引入包,编写好配置文档就可立即使用。但在事务中有一点点问题,就是在事务中有写操作后,以后的读操作才都从主库读取;也就是说在写操作前,事务里的读还是从从库里读的,这可能会造成脏写。
使用Mybatis拦截器
代码层面的读写分离大部分都是拦截sql, 通过判断sql的读写类型来重定向数据库的,ShardingSphere-JDBC也不例外。
Mybatis允许我们自定义拦截器,需要实现Interceptor接口,并且在自定义拦截器类上添加@Intercepts注解。在@Intercepts注解中,我们可以指定拦截的方法。
开干
多数据源
既然是在代码层面进行读写分离,那肯定是有读&写两个库了,这里就用到了多数据源功能,这里没有用Mybatis/Mybatis-Plus默认的多数据源的生成方式,而是自己配置了多数据源,其实也可以用默认生成的方式,自己写一遍的目的是为了更加了解里面的原理。[配置文件里配置的格式就照Mybatis-Plus多数据源配置的格式配]
代码
多数据源配置
/**
* 数据库主库
*/
@Bean
@ConfigurationProperties("spring.datasource.dynamic.datasource.master")
public DataSource masterDataSource(){
log.info("加载主数据源master DataSource.");
return DruidDataSourceBuilder.create().build();
}
/**
* 数据库从库
*/
@Bean
@ConfigurationProperties("spring.datasource.dynamic.datasource.slave1")
public DataSource slave1DataSource(){
log.info("加载从数据源slave1 DataSource.");
return DruidDataSourceBuilder.create().build();
}
/**
* 动态数据源
*/
@Bean
public DataSource myRoutingDataSource(@Qualifier("masterDataSource") DataSource masterDataSource,
@Qualifier("slave1DataSource") DataSource slave1DataSource) {
log.info("加载[masterDataSource-slave1DataSource]设置为动态数据源DynamicDataSource.");
Map<Object, Object> targetDataSources = new HashMap<>(2);
targetDataSources.put(DBTypeEnum.MASTER, masterDataSource);
targetDataSources.put(DBTypeEnum.SLAVE1, slave1DataSource);
DynamicDataSource dynamicDataSource = new DynamicDataSource();
dynamicDataSource.setDefaultTargetDataSource(masterDataSource);
dynamicDataSource.setTargetDataSources(targetDataSources);
return dynamicDataSource;
}
DBTypeEnum
public enum DBTypeEnum {
/**主库*/
MASTER,
/**从库1*/
SLAVE1
}
DynamicDataSource
在这里指定了数据源的Key,每句sql执行前都会执行determineCurrentLookupKey获取数据源。DbContextHolder.get()
就是获取当前线程中的指定数据源的key,这个key会在自定义拦截器里指定。
public class DynamicDataSource extends AbstractRoutingDataSource {
@Nullable
@Override
protected Object determineCurrentLookupKey() {
return DbContextHolder.get();
}
}
public class DbContextHolder {
private static final ThreadLocal<DBTypeEnum> CONTEXT_HOLDER = new ThreadLocal<>();
private static final AtomicInteger COUNTER = new AtomicInteger(-1);
public static void set(DBTypeEnum dbType) {
log.debug("切换到{}", dbType.name());
CONTEXT_HOLDER.set(dbType);
}
public static DBTypeEnum get() {
return CONTEXT_HOLDER.get();
}
public static DBTypeEnum getMaster() {
return DBTypeEnum.MASTER;
}
public static DBTypeEnum getSlave() {
// 多个从库,可以轮询
int index = COUNTER.getAndIncrement() % 2;
if (COUNTER.get() > 9999) {
COUNTER.set(-1);
}
return DBTypeEnum.SLAVE1;
}
}
拦截器
在上一步我们定义了多数据源,并且设置了数据源选择的依据(DbContextHolder.get()
)。这一步,我们就是要根据一定的规则在拦截器里设置这个依据。
代码
拦截器
@Intercepts({
@Signature(type = Executor.class, method = "update", args = {
MappedStatement.class, Object.class }),
@Signature(type = Executor.class, method = "query", args = {
MappedStatement.class, Object.class, RowBounds.class,
ResultHandler.class }),
@Signature(type = Executor.class, method = "close", args = {boolean.class})
})
public class DbSelectorInterceptor implements Interceptor {
private static final String REGEX = ".*insert\\u0020.*|.*delete\\u0020.*|.*update\\u0020.*";
private static final Map<String, DBTypeEnum> CACHE_MAP = new ConcurrentHashMap<>();
@Override
public Object intercept(Invocation invocation) throws Throwable {
String methodName = invocation.getMethod().getName();
String closeMethodName = "close";
boolean synchronizationActive = TransactionSynchronizationManager.isSynchronizationActive();
DBTypeEnum databaseType = null;
if(!synchronizationActive && !closeMethodName.equals(methodName)) {
Object[] objects = invocation.getArgs();
MappedStatement ms = (MappedStatement) objects[0];
if((databaseType = CACHE_MAP.get(ms.getId())) == null) {
//读方法
if(ms.getSqlCommandType().equals(SqlCommandType.SELECT)) {
//!selectKey 为自增id查询主键(SELECT LAST_INSERT_ID() )方法,使用主库
if(ms.getId().contains(SelectKeyGenerator.SELECT_KEY_SUFFIX)) {
databaseType = DbContextHolder.getMaster();
} else {
BoundSql boundSql = ms.getSqlSource().getBoundSql(objects[1]);
String sql = boundSql.getSql().toLowerCase(Locale.CHINA).replaceAll("[\\t\\n\\r]", " ");
if(sql.matches(REGEX)) {
databaseType = DbContextHolder.getMaster();
} else {
databaseType = DbContextHolder.getSlave();
}
}
}else{
databaseType = DbContextHolder.getMaster();
}
log.debug("设置方法[{}] use [{}] Strategy, SqlCommandType [{}]..", ms.getId(), databaseType.name(), ms.getSqlCommandType().name());
CACHE_MAP.put(ms.getId(), databaseType);
}
} else {
if (synchronizationActive) {
log.debug("事务 use [{}] Strategy", DBTypeEnum.MASTER.name());
} else {
log.debug("close方法 重置为 [{}] Strategy", DBTypeEnum.MASTER.name());
}
databaseType = DbContextHolder.getMaster();
}
DbContextHolder.set(databaseType);
return invocation.proceed();
}
@Override
public Object plugin(Object target) {
if (target instanceof Executor) {
return Plugin.wrap(target, this);
} else {
return target;
}
}
}
这块代码比较长,但核心逻辑就3个:
- 如果开启了事务,都使用主库;
- 如果当前是关闭连接,重置为主库;[ps: 忘了不加会出什么问题]
- 其他情况根据sql语句里的关键词
select
、update
、delete
判断;
配置拦截器
这里基于Mybatis-Plus来配置拦截器。
@Bean
public MybatisSqlSessionFactoryBean sqlSessionFactory(@Qualifier("masterDataSource") DataSource masterDataSource,
@Qualifier("slave1DataSource") DataSource slave1DataSource) throws Exception {
log.info("自定义配置mybatis-plus的SqlSessionFactory.");
MybatisSqlSessionFactoryBean mybatisPlus = new MybatisSqlSessionFactoryBean();
mybatisPlus.setDataSource(myRoutingDataSource(masterDataSource, slave1DataSource));
MybatisConfiguration configuration = new MybatisConfiguration();
configuration.setJdbcTypeForNull(JdbcType.NULL);
configuration.setMapUnderscoreToCamelCase(true);
configuration.setCacheEnabled(false);
///自定义配置
mybatisPlus.setConfiguration(configuration);
设置mapper.xml文件的路径
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
org.springframework.core.io.Resource[] resource = resolver.getResources("classpath:mapper/webservice/*.xml");
mybatisPlus.setMapperLocations(resource);
//添加插件到SqlSessionFactory才能生效
mybatisPlus.setPlugins(paginationInterceptor(), new DbSelectorInterceptor());
globalConfig.setMetaObjectHandler(this);
mybatisPlus.setGlobalConfig(globalConfig);
return mybatisPlus;
}
其实就是参考com.baomidou.mybatisplus.autoconfigure.MybatisPlusAutoConfiguration#sqlSessionFactory
,把DbSelectorInterceptor
织入。
以上
标签:return,数据源,读写,分离,class,DBTypeEnum,Mybatis,databaseType,public 来源: https://blog.csdn.net/ooaash/article/details/118047407