其他分享
首页 > 其他分享> > Mybatis读写分离

Mybatis读写分离

作者:互联网

现成工具介绍

MyCat

活跃的、性能好的开源数据库中间件。它是一个透明化的数据库代理端,在生产环境使用需要保证服务的高可用。

ShardingSphere

ShardingSphere由 JDBC、Proxy 和 Sidecar(规划中)这 3 款既能够独立部署,又支持混合部署配合使用的产品组成。其中ShardingSphere-Proxy与MyCat是相同的定位,而ShardingSphere-JDBC在 Java 的 JDBC 层提供的额外服务。

SpringBoot集成ShardingSphere-JDBC也非常方便,引入包,编写好配置文档就可立即使用。但在事务中有一点点问题,就是在事务中有写操作后,以后的读操作才都从主库读取;也就是说在写操作前,事务里的读还是从从库里读的,这可能会造成脏写。

使用Mybatis拦截器

代码层面的读写分离大部分都是拦截sql, 通过判断sql的读写类型来重定向数据库的,ShardingSphere-JDBC也不例外。

Mybatis允许我们自定义拦截器,需要实现Interceptor接口,并且在自定义拦截器类上添加@Intercepts注解。在@Intercepts注解中,我们可以指定拦截的方法。

开干

多数据源

既然是在代码层面进行读写分离,那肯定是有读&写两个库了,这里就用到了多数据源功能,这里没有用Mybatis/Mybatis-Plus默认的多数据源的生成方式,而是自己配置了多数据源,其实也可以用默认生成的方式,自己写一遍的目的是为了更加了解里面的原理。[配置文件里配置的格式就照Mybatis-Plus多数据源配置的格式配]

代码

多数据源配置
/**
 * 数据库主库
 */
@Bean
@ConfigurationProperties("spring.datasource.dynamic.datasource.master")
public DataSource masterDataSource(){
    log.info("加载主数据源master DataSource.");
    return DruidDataSourceBuilder.create().build();
}

/**
 * 数据库从库
 */
@Bean
@ConfigurationProperties("spring.datasource.dynamic.datasource.slave1")
public DataSource slave1DataSource(){
    log.info("加载从数据源slave1 DataSource.");
    return DruidDataSourceBuilder.create().build();
}

/**
 * 动态数据源
 */
@Bean
public DataSource myRoutingDataSource(@Qualifier("masterDataSource") DataSource masterDataSource,
                                      @Qualifier("slave1DataSource") DataSource slave1DataSource) {
    log.info("加载[masterDataSource-slave1DataSource]设置为动态数据源DynamicDataSource.");
    Map<Object, Object> targetDataSources = new HashMap<>(2);
    targetDataSources.put(DBTypeEnum.MASTER, masterDataSource);
    targetDataSources.put(DBTypeEnum.SLAVE1, slave1DataSource);

    DynamicDataSource dynamicDataSource = new DynamicDataSource();
    dynamicDataSource.setDefaultTargetDataSource(masterDataSource);
    dynamicDataSource.setTargetDataSources(targetDataSources);

    return dynamicDataSource;
}
DBTypeEnum
public enum DBTypeEnum {
    /**主库*/
    MASTER,

    /**从库1*/
    SLAVE1
}
DynamicDataSource

在这里指定了数据源的Key,每句sql执行前都会执行determineCurrentLookupKey获取数据源。DbContextHolder.get()就是获取当前线程中的指定数据源的key,这个key会在自定义拦截器里指定。

public class DynamicDataSource extends AbstractRoutingDataSource {

    @Nullable
    @Override
    protected Object determineCurrentLookupKey() {
        return DbContextHolder.get();
    }
}
public class DbContextHolder {
    private static final ThreadLocal<DBTypeEnum> CONTEXT_HOLDER = new ThreadLocal<>();
    private static final AtomicInteger COUNTER = new AtomicInteger(-1);

    public static void set(DBTypeEnum dbType) {
        log.debug("切换到{}", dbType.name());
        CONTEXT_HOLDER.set(dbType);
    }

    public static DBTypeEnum get() {
        return CONTEXT_HOLDER.get();
    }

    public static DBTypeEnum getMaster() {
        return DBTypeEnum.MASTER;
    }

    public static DBTypeEnum getSlave() {
        // 多个从库,可以轮询
        int index = COUNTER.getAndIncrement() % 2;
        if (COUNTER.get() > 9999) {
            COUNTER.set(-1);
        }
        return DBTypeEnum.SLAVE1;
    }

}

拦截器

在上一步我们定义了多数据源,并且设置了数据源选择的依据(DbContextHolder.get())。这一步,我们就是要根据一定的规则在拦截器里设置这个依据。

代码

拦截器
@Intercepts({
        @Signature(type = Executor.class, method = "update", args = {
                MappedStatement.class, Object.class }),
        @Signature(type = Executor.class, method = "query", args = {
                MappedStatement.class, Object.class, RowBounds.class,
                ResultHandler.class }),
        @Signature(type = Executor.class, method = "close", args = {boolean.class})
})
public class DbSelectorInterceptor implements Interceptor {
    private static final String REGEX = ".*insert\\u0020.*|.*delete\\u0020.*|.*update\\u0020.*";

    private static final Map<String, DBTypeEnum> CACHE_MAP = new ConcurrentHashMap<>();

    @Override
    public Object intercept(Invocation invocation) throws Throwable {
        String methodName = invocation.getMethod().getName();
        String closeMethodName = "close";
        boolean synchronizationActive = TransactionSynchronizationManager.isSynchronizationActive();
        DBTypeEnum databaseType = null;
        if(!synchronizationActive && !closeMethodName.equals(methodName)) {
            Object[] objects = invocation.getArgs();
            MappedStatement ms = (MappedStatement) objects[0];

            if((databaseType = CACHE_MAP.get(ms.getId())) == null) {
                //读方法
                if(ms.getSqlCommandType().equals(SqlCommandType.SELECT)) {
                    //!selectKey 为自增id查询主键(SELECT LAST_INSERT_ID() )方法,使用主库
                    if(ms.getId().contains(SelectKeyGenerator.SELECT_KEY_SUFFIX)) {
                        databaseType = DbContextHolder.getMaster();
                    } else {
                        BoundSql boundSql = ms.getSqlSource().getBoundSql(objects[1]);
                        String sql = boundSql.getSql().toLowerCase(Locale.CHINA).replaceAll("[\\t\\n\\r]", " ");
                        if(sql.matches(REGEX)) {
                            databaseType = DbContextHolder.getMaster();
                        } else {
                            databaseType = DbContextHolder.getSlave();
                        }
                    }
                }else{
                    databaseType = DbContextHolder.getMaster();
                }
                log.debug("设置方法[{}] use [{}] Strategy, SqlCommandType [{}]..", ms.getId(), databaseType.name(), ms.getSqlCommandType().name());
                CACHE_MAP.put(ms.getId(), databaseType);
            }

        } else {
            if (synchronizationActive) {
                log.debug("事务 use [{}] Strategy", DBTypeEnum.MASTER.name());
            } else {
                log.debug("close方法 重置为 [{}] Strategy", DBTypeEnum.MASTER.name());
            }
            databaseType = DbContextHolder.getMaster();
        }
        DbContextHolder.set(databaseType);

        return invocation.proceed();
    }

    @Override
    public Object plugin(Object target) {
        if (target instanceof Executor) {
            return Plugin.wrap(target, this);
        } else {
            return target;
        }
    }
}

这块代码比较长,但核心逻辑就3个:

  1. 如果开启了事务,都使用主库;
  2. 如果当前是关闭连接,重置为主库;[ps: 忘了不加会出什么问题]
  3. 其他情况根据sql语句里的关键词selectupdatedelete判断;

配置拦截器

这里基于Mybatis-Plus来配置拦截器。

    @Bean
    public MybatisSqlSessionFactoryBean sqlSessionFactory(@Qualifier("masterDataSource") DataSource masterDataSource,
                                               @Qualifier("slave1DataSource") DataSource slave1DataSource) throws Exception {
        log.info("自定义配置mybatis-plus的SqlSessionFactory.");

        MybatisSqlSessionFactoryBean mybatisPlus = new MybatisSqlSessionFactoryBean();
        mybatisPlus.setDataSource(myRoutingDataSource(masterDataSource, slave1DataSource));

        MybatisConfiguration configuration = new MybatisConfiguration();
        configuration.setJdbcTypeForNull(JdbcType.NULL);
        configuration.setMapUnderscoreToCamelCase(true);
        configuration.setCacheEnabled(false);
        ///自定义配置
        mybatisPlus.setConfiguration(configuration);

         设置mapper.xml文件的路径
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        org.springframework.core.io.Resource[] resource = resolver.getResources("classpath:mapper/webservice/*.xml");
        mybatisPlus.setMapperLocations(resource);
        //添加插件到SqlSessionFactory才能生效
        mybatisPlus.setPlugins(paginationInterceptor(), new DbSelectorInterceptor());
        globalConfig.setMetaObjectHandler(this);
        mybatisPlus.setGlobalConfig(globalConfig);
        return mybatisPlus;
    }

其实就是参考com.baomidou.mybatisplus.autoconfigure.MybatisPlusAutoConfiguration#sqlSessionFactory,把DbSelectorInterceptor织入。

以上

标签:return,数据源,读写,分离,class,DBTypeEnum,Mybatis,databaseType,public
来源: https://blog.csdn.net/ooaash/article/details/118047407