Dapr牵手.NET学习笔记:Actor一个场景
作者:互联网
接上一篇最后的场景,为了解决相同帐户并发引起的数据库行级锁,可以引入Actor的串机制,相同ActorID的实例,串行,这样就能在应用层把读取余额的资源争抢解决掉,剩下的工作就是一定时间间隔,把内存中的数据批量更新到数据库中,大大减少了数据库的资源占用。
不废话了,看实现代码吧。
IAccountActor接口
public interface IAccountActor : IActor { Task<decimal> ChargeAsync(decimal amount); }
AccountActor实现
using Dapr.Actors.Runtime; using IOrderFactoryActory.Interfaces; namespace OrderFactoryService { public class AccountActor : Actor, IAccountActor { public AccountActor(ActorHost host) : base(host) { } public async Task<decimal> ChargeAsync(decimal amount) { var balance = 0m; var balanceValue = await this.StateManager.TryGetStateAsync<decimal>("balance"); if (balanceValue.HasValue) { balance = balanceValue.Value; } balance += amount; await this.StateManager.SetStateAsync<decimal>("balance", balance); return balance; } } }
asp.net 6引入dapr.actor
using OrderFactoryService; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; var builder = WebApplication.CreateBuilder(args); builder.Services.AddControllers(); builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen(); builder.Services.AddActors(options => { options.HttpEndpoint = "http://localhost:3999"; options.Actors.RegisterActor<AccountActor>(); }); var app = builder.Build(); if (app.Environment.IsDevelopment()) { app.UseSwagger(); app.UseSwaggerUI(); } app.UseAuthorization(); app.UseRouting(); app.UseEndpoints(endpoints => { endpoints.MapActorsHandlers(); }); app.MapControllers(); app.Run();
客户端调用我是模拟一次50并发,当然也可以也可以换成web api,自动同步用了一个Job框架Quartz.Net,与Charge操作分离。
using Dapr.Actors; using Dapr.Actors.Client; using IOrderFactoryActory.Interfaces; using Quartz; using Quartz.Impl; using System.Security.Cryptography; Console.WriteLine("回车开始"); Console.ReadLine(); var scheduler = await RunJobAsync(); var factory = new ActorProxyFactory(new ActorProxyOptions { HttpEndpoint = "http://localhost:3999" }); var accountNo = "808080808080808080"; var account = CreateActor(factory, accountNo); var total = 0m; while (true) { var tasks = new List<Task>(); for (var i = 0; i < 50; i++) { var amount = RandomNumberGenerator.GetInt32(100, 5000); var chargeTask = new Task(async () => { var balance = await account.ChargeAsync(amount); Console.WriteLine($"**** 账户:{accountNo} 本次存款:{amount} 缓存余额:{balance} ****"); }); tasks.Add(chargeTask); total += amount; } foreach (var task in tasks) { task.Start(); } Console.WriteLine($"全部存款汇总:{total}"); Console.WriteLine("回车继续发一批,退出按E"); if (Console.ReadLine() == "E") { break; } } await scheduler.Shutdown(); static IAccountActor CreateActor(ActorProxyFactory factory, string accountNo) { var actorType = "AccountActor"; var actorId = new ActorId(accountNo); return factory.CreateActorProxy<IAccountActor>(actorId, actorType); } static async Task<IScheduler> RunJobAsync() { var factory = new StdSchedulerFactory(); var scheduler = await factory.GetScheduler(); await scheduler.Start(); var job = JobBuilder.Create<SavaAccountJob>() .WithIdentity("SavaAccountJob", "SavaAccountGroup") .Build(); var trigger = TriggerBuilder.Create() .WithIdentity("SavaAccountTrigger", "SavaAccountGroup") .StartNow() .WithSimpleSchedule(x => x .WithIntervalInSeconds(10) .RepeatForever()) .Build(); await scheduler.ScheduleJob(job, trigger); return scheduler; } class SavaAccountJob : IJob { static decimal total = 0; public async Task Execute(IJobExecutionContext context) { var accountNo = "808080808080808080"; var actorType = "AccountActor"; var actorId = new ActorId(accountNo); var factory = new ActorProxyFactory(new ActorProxyOptions { HttpEndpoint = "http://localhost:3999" }); var account = factory.CreateActorProxy<IAccountActor>(actorId, actorType); var balance = await account.ChargeAsync(0m); total += balance; var newBalance = account.ChargeAsync(-balance).Result; Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine($"账户:{accountNo} 处理余额:{balance} 定时处理完后余额:{newBalance} 总体余额:{total}"); Console.ResetColor(); } }
测试时,不停的按回车,一段时间后查看“全部存款汇总”和后台任务处理的“总体余额”是否相等,相等的话说明多批次存款和多批次保存余额的数值相等,没有丢失败数据。
本质上,这里是把数据库的行级锁,转换成了调用方法的串行(方法里缓存计算数据)化。
想要更快更方便的了解相关知识,可以关注微信公众号
标签:Console,app,Actor,Dapr,new,var,using,NET,balance 来源: https://www.cnblogs.com/ljknlb/p/16191913.html