其他分享
首页 > 其他分享> > NetCore框架WTM的分表分库实现

NetCore框架WTM的分表分库实现

作者:互联网

介绍

本期主角:

ShardingCore最新版本针对路由有了极大性能的优化由原先的Expression改成自定义的RouteExpression去除了Compile带来的性能损耗

我不是efcore怎么办

这边肯定有小伙伴要问有没有不是efcore的,我这边很确信的和你讲有并且适应所有的ADO.NET包括sqlhelper
ShardingConnector 一款基于ado.net下的高性能分表分库解决方案目前已有demo案例,这个框架你可以认为是.Net版本的ShardingSphere但是目前仅实现了ShardingSphere-JDBC,后续我将会实现ShardingSphere-Proxy希望各位.Neter多多关注

背景

之前我不是发了一篇博客吗.Net分表分库动态化处理 下面有个小伙伴留言,希望可以让我支持一下WTM 框架。我心想着处于对自己的框架的自信,并且之前有过对abpvnexfurion等一系列框架的兼容适配的尝试,原则上将只要你是efcore那么基本上都可以支持,所以秉着尝试以下的态度这边就上手了,先说下结论就是可以支持,完美不完美不清楚因为本人这个框架用的不多不知道是否是完美适配。

原理

ShardingCore

ShardingCore的整体架构是一个壳dbcontext带多个dbcontext,壳dbcontext不进行增删改查,由内部的dbcontext自己去执行,这个因为efcore的一个对象对应一个表所限制的。我们这边把壳dbcontext称作shellDbContext,执行的dbcontext叫做executorDbContext,对于ShardingCore还有一个要求就是需要初始化启动的时候Start()Start()内部需要IServiceProvider来获取DbContext,所以说整个框架离不开ioc,那么就需要启动的时候依赖注入DbContext,又因为依赖注入如果是默认的只能允许单个构造函数。这就是ShardingCore在兼容使用的时候需要注意的地方。

WTM

WTM这边我不是很熟悉,花了大概半个小时到一个小时左右的时间,进行了代码的翻阅,大概了解了其中的实现,DbContext的创建由独立的构造函数来实现,默认通过DbContext的内部方法 OnConfiguring(DbContextOptionsBuilder optionsBuilder)来进行初始化,框架里面将DbContext抽象成了IDataContext接口,框架默IDataContext接口默认依赖注入为NullDbContext如果需要使用会自行通过反射调用构造函数参数为CS类型的那一个。整体的efcore上的一些处理通过调试代码和源码的查看基本上了解了

开始接入

创建项目

那么我们首先通过WTM生成一个脚手架的简单项目,这边生成了一个mvc的项目。

添加依赖

添加ShardingCore依赖,需要x.5.0.6+版本,x代表efcore的版本

Install-Package ShardingCore -Version 6.5.0.6

添加抽象分表DbContext

这边和AbpVNext时候继承一样,因为c#不支持多继承,好在ShardingCore是接口依赖不存在实现依赖所以任何框架都可以兼容。


    public abstract class AbstractShardingFrameworkContext:FrameworkContext, IShardingDbContext, ISupportShardingReadWrite
    {
        protected IShardingDbContextExecutor ShardingDbContextExecutor
        {
            get;
        }

        public AbstractShardingFrameworkContext(CS cs)
            : base(cs)
        {
            
            ShardingDbContextExecutor =
                (IShardingDbContextExecutor)Activator.CreateInstance(
                    typeof(ShardingDbContextExecutor<>).GetGenericType0(this.GetType()),this);
            IsExecutor = false;
        }
        
        public AbstractShardingFrameworkContext(string cs, DBTypeEnum dbtype)
            : base(cs, dbtype)
        {
            ShardingDbContextExecutor =
                (IShardingDbContextExecutor)Activator.CreateInstance(
                    typeof(ShardingDbContextExecutor<>).GetGenericType0(this.GetType()),this);
            IsExecutor = false;
        }
        
        public AbstractShardingFrameworkContext(string cs, DBTypeEnum dbtype, string version = null)
            : base(cs, dbtype, version)
        {
            ShardingDbContextExecutor =
                (IShardingDbContextExecutor)Activator.CreateInstance(
                    typeof(ShardingDbContextExecutor<>).GetGenericType0(this.GetType()),this);
            IsExecutor = false;
        }

        public AbstractShardingFrameworkContext(DbContextOptions options) : base(options)
        {
            var wrapOptionsExtension = options.FindExtension<ShardingWrapOptionsExtension>();
            if (wrapOptionsExtension != null)
            {
                ShardingDbContextExecutor =
                    (IShardingDbContextExecutor)Activator.CreateInstance(
                        typeof(ShardingDbContextExecutor<>).GetGenericType0(this.GetType()),this);
            }

            IsExecutor = wrapOptionsExtension == null;
        }
        
        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
        {
            if (this.CSName!=null)
            {
                base.OnConfiguring(optionsBuilder);
                optionsBuilder.UseSharding<DataContext>();
            }
        }
        /// <summary>
        /// 读写分离优先级
        /// </summary>
        public int ReadWriteSeparationPriority
        {
            get => ShardingDbContextExecutor.ReadWriteSeparationPriority;
            set => ShardingDbContextExecutor.ReadWriteSeparationPriority = value;
        }
        /// <summary>
        /// 是否使用读写分离
        /// </summary>
        public bool ReadWriteSeparation
        {
            get => ShardingDbContextExecutor.ReadWriteSeparation;
            set => ShardingDbContextExecutor.ReadWriteSeparation = value;
        }

        /// <summary>
        /// 是否是真正的执行者
        /// </summary>
        public bool IsExecutor { get;}



        public DbContext GetDbContext(string dataSourceName, bool parallelQuery, IRouteTail routeTail)
        {
            return ShardingDbContextExecutor.CreateDbContext(parallelQuery, dataSourceName, routeTail);
        }

        /// <summary>
        /// 根据对象创建通用的dbcontext
        /// </summary>
        /// <typeparam name="TEntity"></typeparam>
        /// <param name="entity"></param>
        /// <returns></returns>
        public DbContext CreateGenericDbContext<TEntity>(TEntity entity) where TEntity : class
        {
            return ShardingDbContextExecutor.CreateGenericDbContext(entity);
        }

        public IVirtualDataSource GetVirtualDataSource()
        {
            return ShardingDbContextExecutor.GetVirtualDataSource();
        }


        public override EntityEntry Add(object entity)
        {
            if (IsExecutor)
                base.Add(entity);
            return CreateGenericDbContext(entity).Add(entity);
        }

        public override EntityEntry<TEntity> Add<TEntity>(TEntity entity)
        {
            if (IsExecutor)
                return base.Add(entity);
            return CreateGenericDbContext(entity).Add(entity);
        }

        public override ValueTask<EntityEntry<TEntity>> AddAsync<TEntity>(TEntity entity, CancellationToken cancellationToken = new CancellationToken())
        {
            if (IsExecutor)
                return base.AddAsync(entity, cancellationToken);
            return CreateGenericDbContext(entity).AddAsync(entity, cancellationToken);
        }

        public override ValueTask<EntityEntry> AddAsync(object entity, CancellationToken cancellationToken = new CancellationToken())
        {
            if (IsExecutor)
                return base.AddAsync(entity, cancellationToken);
            return CreateGenericDbContext(entity).AddAsync(entity, cancellationToken);
        }

        private Dictionary<DbContext, IEnumerable<TEntity>> AggregateToDic<TEntity>(IEnumerable<TEntity> entities) where TEntity:class
        {
            return entities.Select(o =>
            {
                var dbContext = CreateGenericDbContext(o);
                return new
                {
                    DbContext = dbContext,
                    Entity = o
                };
            }).GroupBy(g => g.DbContext).ToDictionary(o => o.Key, o => o.Select(g => g.Entity));
        }
        public override void AddRange(params object[] entities)
        {
            if (IsExecutor)
            {
                base.AddRange(entities);
                return;
            }

            var aggregateToDic = AggregateToDic(entities);
            foreach (var aggregateKv in aggregateToDic)
            {
                aggregateKv.Key.AddRange(aggregateKv.Value);
            }
        }

        public override void AddRange(IEnumerable<object> entities)
        {
            if (IsExecutor)
            {
                base.AddRange(entities);
                return;
            }

            var aggregateToDic = AggregateToDic(entities);
            foreach (var aggregateKv in aggregateToDic)
            {
                aggregateKv.Key.AddRange(aggregateKv.Value);
            }
        }

        public override async Task AddRangeAsync(params object[] entities)
        {
            if (IsExecutor)
            {
                await base.AddRangeAsync(entities);
                return;
            }
            var aggregateToDic = AggregateToDic(entities);
            foreach (var aggregateKv in aggregateToDic)
            {
                await aggregateKv.Key.AddRangeAsync(aggregateKv.Value);
            }
        }

        public override async Task AddRangeAsync(IEnumerable<object> entities, CancellationToken cancellationToken = new CancellationToken())
        {
            if (IsExecutor)
            {
                await base.AddRangeAsync(entities, cancellationToken);
                return;
            }
            var aggregateToDic = AggregateToDic(entities);
            foreach (var aggregateKv in aggregateToDic)
            {
                await aggregateKv.Key.AddRangeAsync(aggregateKv.Value,cancellationToken);
            }
        }

        public override EntityEntry<TEntity> Attach<TEntity>(TEntity entity)
        {
            if (IsExecutor)
                return base.Attach(entity);
            return CreateGenericDbContext(entity).Attach(entity);
        }

        public override EntityEntry Attach(object entity)
        {
            if (IsExecutor)
                return base.Attach(entity);
            return CreateGenericDbContext(entity).Attach(entity);
        }

        public override void AttachRange(params object[] entities)
        {
            if (IsExecutor)
            {
                base.AttachRange(entities);
                return;
            }
            var aggregateToDic = AggregateToDic(entities);
            foreach (var aggregateKv in aggregateToDic)
            {
                 aggregateKv.Key.AttachRange(aggregateKv.Value);
            }
        }

        public override void AttachRange(IEnumerable<object> entities)
        {
            if (IsExecutor)
            {
                base.AttachRange(entities);
                return;
            }
            var aggregateToDic = AggregateToDic(entities);
            foreach (var aggregateKv in aggregateToDic)
            {
                aggregateKv.Key.AttachRange(aggregateKv.Value);
            }
        }

        public override EntityEntry<TEntity> Entry<TEntity>(TEntity entity)
        {
            if (IsExecutor)
                return base.Entry(entity);
            return CreateGenericDbContext(entity).Entry(entity);
        }

        public override EntityEntry Entry(object entity)
        {
            if (IsExecutor)
                return base.Entry(entity);
            return CreateGenericDbContext(entity).Entry(entity);
        }

        public override EntityEntry<TEntity> Update<TEntity>(TEntity entity)
        {
            if (IsExecutor)
                return base.Update(entity);
            return CreateGenericDbContext(entity).Update(entity);
        }

        public override EntityEntry Update(object entity)
        {
            if (IsExecutor)
                return base.Update(entity);
            return CreateGenericDbContext(entity).Update(entity);
        }

        public override void UpdateRange(params object[] entities)
        {
            if (IsExecutor)
            {
                base.UpdateRange(entities);
                return;
            }
            var aggregateToDic = AggregateToDic(entities);
            foreach (var aggregateKv in aggregateToDic)
            {
                aggregateKv.Key.UpdateRange(aggregateKv.Value);
            }
        }

        public override void UpdateRange(IEnumerable<object> entities)
        {
            if (IsExecutor)
            {
                base.UpdateRange(entities);
                return;
            }
            var aggregateToDic = AggregateToDic(entities);
            foreach (var aggregateKv in aggregateToDic)
            {
                aggregateKv.Key.UpdateRange(aggregateKv.Value);
            }
        }

        public override EntityEntry<TEntity> Remove<TEntity>(TEntity entity)
        {
            if (IsExecutor)
                return base.Remove(entity);
            return CreateGenericDbContext(entity).Remove(entity);
        }

        public override EntityEntry Remove(object entity)
        {
            if (IsExecutor)
                return base.Remove(entity);
            return CreateGenericDbContext(entity).Remove(entity);
        }

        public override void RemoveRange(params object[] entities)
        {
            if (IsExecutor)
            {
                base.RemoveRange(entities);
                return;
            }
            var aggregateToDic = AggregateToDic(entities);
            foreach (var aggregateKv in aggregateToDic)
            {
                aggregateKv.Key.RemoveRange(aggregateKv.Value);
            }
        }

        public override void RemoveRange(IEnumerable<object> entities)
        {
            if (IsExecutor)
            {
                base.RemoveRange(entities);
                return;
            }
            var aggregateToDic = AggregateToDic(entities);
            foreach (var aggregateKv in aggregateToDic)
            {
                aggregateKv.Key.RemoveRange(aggregateKv.Value);
            }
        }

        public override int SaveChanges()
        {

            if (IsExecutor)
                return base.SaveChanges();
            return this.SaveChanges(true);
        }

        public override int SaveChanges(bool acceptAllChangesOnSuccess)
        {
            if (IsExecutor)
                return base.SaveChanges(acceptAllChangesOnSuccess);
            //ApplyShardingConcepts();
            int i = 0;
            //如果是内部开的事务就内部自己消化
            if (Database.AutoTransactionsEnabled&&Database.CurrentTransaction==null&&ShardingDbContextExecutor.IsMultiDbContext)
            {
                using (var tran = Database.BeginTransaction())
                {
                    i = ShardingDbContextExecutor.SaveChanges(acceptAllChangesOnSuccess);
                    tran.Commit();
                }
            }
            else
            {
                i = ShardingDbContextExecutor.SaveChanges(acceptAllChangesOnSuccess);
            }

            return i;
        }


        public override Task<int> SaveChangesAsync(CancellationToken cancellationToken = new CancellationToken())
        {
            if (IsExecutor)
                return base.SaveChangesAsync(cancellationToken);
            return this.SaveChangesAsync(true, cancellationToken);
        }

        public override async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess, CancellationToken cancellationToken = new CancellationToken())
        {
            if (IsExecutor)
                return await base.SaveChangesAsync(acceptAllChangesOnSuccess,cancellationToken);
            //ApplyShardingConcepts();
            int i = 0;
            //如果是内部开的事务就内部自己消化
            if (Database.AutoTransactionsEnabled && Database.CurrentTransaction==null && ShardingDbContextExecutor.IsMultiDbContext)
            {
                using (var tran = await Database.BeginTransactionAsync(cancellationToken))
                {
                    i = await ShardingDbContextExecutor.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
                    await tran.CommitAsync(cancellationToken);
                }
            }
            else
            {
                i = await ShardingDbContextExecutor.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
            }


            return i;
        }

        public override void Dispose()
        {

            if (IsExecutor)
            {
                base.Dispose();
            }
            else
            {
                ShardingDbContextExecutor.Dispose();
                base.Dispose();
            }
        }

        public override async ValueTask DisposeAsync()
        {
            if (IsExecutor)
            {
                await base.DisposeAsync();
            }
            else
            {
                await ShardingDbContextExecutor.DisposeAsync();

                await base.DisposeAsync();
            }
        }
        public Task RollbackAsync(CancellationToken cancellationToken = new CancellationToken())
        {
            return ShardingDbContextExecutor.RollbackAsync(cancellationToken);
        }

        public Task CommitAsync(CancellationToken cancellationToken = new CancellationToken())
        {
            return ShardingDbContextExecutor.CommitAsync(cancellationToken);
        }

        public void NotifyShardingTransaction()
        {
            ShardingDbContextExecutor.NotifyShardingTransaction();
        }

        public void Rollback()
        {
            ShardingDbContextExecutor.Rollback();
        }

        public void Commit()
        {
            ShardingDbContextExecutor.Commit();
        }
        
    }

简单说一下这边实现了WTM的所有构造函数,因为ShardingCore原生需要DbContextOption,当然也是可以支持实现类由自定义DbContext,构造函数中如果使用了DbContextOption那么就是由依赖注入或者ShardingCore创建的DbContext,其余的全部是WTM创建的,所以这边都需要实现并且其余的构造函数直接设置为ShellDbContext

又因为WTM默认的创建会赋值CSName所以需要对其后续进行UseSharding处理这是ShardingCore针对ShellDbContext必须要处理的


        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
        {
            if (this.CSName!=null)
            {
                base.OnConfiguring(optionsBuilder);
                optionsBuilder.UseSharding<DataContext>();
            }
        }

实现DataContext

很简单只需要继承抽象类和实现IShardingTableDbContext接口即可,实现该接口才能支持分表否则仅支持分库

 public class DataContext : AbstractShardingFrameworkContext,IShardingTableDbContext
{
}

编写自定义DbContext创建

因为WTM框架的DbContext拥有多个构造函数所以需要自定义,由ShardingCore提供

代码其实很简单就是如何创建一个DbContext,因为ShardingCore默认的会校验只能拥有一个构造函数并且构造函数只能是DbContextOptions或者DbContextOptions<>

public class WTMDbContextCreator<TShardingDbContext>:IDbContextCreator<TShardingDbContext>  where TShardingDbContext : DbContext, IShardingDbContext
{
    public DbContext CreateDbContext(DbContext shellDbContext, ShardingDbContextOptions shardingDbContextOptions)
    {
        var context = new DataContext((DbContextOptions<DataContext>)shardingDbContextOptions.DbContextOptions);
        context.RouteTail = shardingDbContextOptions.RouteTail;
        return context;
    }
}

编写分表测试类

    public class Todo
    {
    public string Id { get; set; }
    public string Name { get; set; }
    }

然后再DbContext出简单设置一下

        protected override void OnModelCreating(ModelBuilder modelBuilder)
        {
            base.OnModelCreating(modelBuilder);
            //你用dbset也是可以的
            modelBuilder.Entity<Todo>(e =>
            {
                e.HasKey(o => o.Id);
                e.ToTable(nameof(Todo));
            });
        }

添加分表路由


    public class TodoRoute:AbstractSimpleShardingModKeyStringVirtualTableRoute<Todo>
    {
        public TodoRoute() : base(2, 10)
        {
        }

        public override void Configure(EntityMetadataTableBuilder<Todo> builder)
        {
            builder.ShardingProperty(o => o.Id);
        }
    }

StartUp

接下来就是激动人心的时候了,首先我们说过ShardingCore需要依赖注入,由因为DbContext是多构造函数

services.AddScoped<DataContext>(sp =>
            {
                var dbContextOptionsBuilder = new DbContextOptionsBuilder<DataContext>();
                dbContextOptionsBuilder.UseMySql(
                    "server=127.0.0.1;port=3306;database=shardingTest;userid=root;password=root;",
                    new MySqlServerVersion(new Version()));
                dbContextOptionsBuilder.UseSharding<DataContext>();
                return new DataContext(dbContextOptionsBuilder.Options);
            });

注意依赖注入获取的是ShellDbContext所以我们需要对其进行UseSharding()

再来我们需要配置ShardingCore

services.AddShardingConfigure<DataContext>()
                .AddEntityConfig(o =>
                {
                    o.CreateShardingTableOnStart = true;
                    o.EnsureCreatedWithOutShardingTable = true;
                    o.AddShardingTableRoute<TodoRoute>();
                })
                .AddConfig(o =>
                {
                    o.AddDefaultDataSource("ds0",
                        "server=127.0.0.1;port=3306;database=shardingTest;userid=root;password=root;");
                    o.ConfigId = "c1";
                    o.UseShardingQuery((conn, build) =>
                    {
                        build.UseMySql(conn, new MySqlServerVersion(new Version())).UseLoggerFactory(efLogger);
                    });
                    o.UseShardingTransaction((conn,build)=>
                        build.UseMySql(conn,new MySqlServerVersion(new Version())).UseLoggerFactory(efLogger)
                        );
                    o.ReplaceTableEnsureManager(sp => new MySqlTableEnsureManager<DataContext>());
                }).EnsureConfig();

这边的配置就是ShardingCore很简单可以查询文档或者过往的博客

这个时候有人要说了为什么不使用AddShardingDbContext因为多构造函数默认不支持需要手动处理。

替换ShardingCoreDbContext创建,我们刚才写的

   services.Replace(ServiceDescriptor.Singleton<IDbContextCreator<DataContext>, WTMDbContextCreator<DataContext>>());

再然后替换WTMIDataContext

//这是WTM的默认的需要替换掉
//services.TryAddScoped<IDataContext, NullContext>();
  services.Replace(ServiceDescriptor.Scoped<IDataContext>(sp =>
            {
                return sp.GetService<DataContext>();
            }));

然后启动初始化ShardingCore

            app.ApplicationServices.GetRequiredService<IShardingBootstrapper>().Start();

编写测试demo

  public async Task<ActionResult> Login(LoginVM vm)
        {
            var dataContext = Wtm.DC;
            var todos = new List<Todo>();
            for (int i = 0; i < 100; i++)
            {
                var todo = new Todo();
                todo.Id = Guid.NewGuid().ToString("n");
                todo.Name = todo.Id;
                todos.Add(todo);
            }

            await dataContext.Set<Todo>().AddRangeAsync(todos);
            await dataContext.SaveChangesAsync();

            var listAsync = await dataContext.Set<Todo>().Take(2).ToListAsync();
....
}

启动运行

完美创建分表并且可以插入查询完全和使用WTM一样

最后的最后

demo地址 https://github.com/xuejmnet/ShardingWTM

您都看到这边了确定不点个star或者赞吗,一款.Net不得不学的分库分表解决方案,简单理解为sharding-jdbc在.net中的实现并且支持更多特性和更优秀的数据聚合,拥有原生性能的97%,并且无业务侵入性,支持未分片的所有efcore原生查询

标签:分库,return,NetCore,WTM,IsExecutor,entity,entities,base,public
来源: https://www.cnblogs.com/xuejiaming/p/16357928.html