两种全表扫描的思路
作者:互联网
日常的工作中,可能需要对分片表进行全表扫描,这里介绍两种并发全表扫描的方法:
思路1:分片分页读取+并发请求
两个for循环,外层for循环遍历每个分片,内层for循环并发处理这些数据。整个处理过程可分为数据获取和并发请求两部分,两部分串行执行,先获取数据,再并发处理这些数据。数据获取
a、外层循环遍历每个分片
b、每个分片内分页读取数据
// 初试化分片为0
minId := int64(0)
// 遍历分片
for shardId := int64(0); shardId < info.getShardingCount(); {
// 获取数据
data, err := info.PageQueryData(shardId, PageSize, minId)
// 并发处理 此处省略
// ...
// 如果获取到的数据长度小于每页数量,则表明数据读取完,分片数+1,否则继续读取该分片数据
if len(data) < PageSize {
shardId++
minId = 0
}
}
并发处理
a、设置最大并发goroutine数量n
b、通过buffersize = n的channel控制并发goroutine数量:每开启一个goroutine向channel中插入一条数据,每个goroutine结束后从channel取出一条数据
c、通过sync.WaitGroup等待所有goroutine执行完后才读取下一批数据
// 初试化分片为0
minId := int64(0)
// 遍历分片
for shardId := int64(0); shardId < info.getShardingCount(); {
// 获取数据
data, err := info.BatchGetData(shardId, PageSize, minId)
// 并发处理
for _, item := range data {
info.waitGroup.Add(1)
// 向channel插入数据
info.concurrencyCtlChannel <- 1
if item.Id > minId {
minId = item.Id
}
go func(ctx context.Context, data *Data) {
defer func() {
if err := recover(); err != nil {
logs.CtxError(ctx, "Task: task execute error. err: %v", err)
}
// 运行完后从channel读取数据
<-info.concurrencyCtlChannel
info.waitGroup.Done()
}()
// 业务处理 此处省略
// ...
}(ctx, item)
}
// 等待data全部处理完
info.waitGroup.Wait()
// 如果获取到的数据长度小于每页数量,则表明数据读取完,分片数+1,否则继续读取该分片数据
if len(data) < PageSize {
shardId++
minId = 0
}
}
思路2:并发读取+并发处理(生产者-消费者模型)
a、创建一组生产者goroutine和消费者goroutine,以及一组传递消息的channel,不同的生产者goroutine从不同的分片中分页获取数据,向channel中插入数据,消费者goroutine从channel中获取数据进行消费。
b、另外通过一个channel控制生产者goroutine的数量,用一个输入参数控制消费者goroutine的数量。
c、使用sync.WaitGroup等待生产者生产完数据后关闭管道,等待消费者消费完数据后结束任务。
并发读取
func ProduceData() {
for i = minShardingKey; i <= maxShardingKey; i++ {
channel := channelGroup[i % channelSize]
pwg.Add(1)
// 控制produce goroutine数量
produceChannel <- 1
go func(ch chan *db.Data) {
defer func() {
t.pwg.Done()
<- produceChannel
}
minId := int64(1)
for {
// 分页从数据库读取数据
data, err := db.GetData(shardingKey, minId, PageSize)
// 向channel生产数据
for _, order := range data {
if order.Id > minId {
minId = order.Id
}
// 如果channel已满则阻塞在此等待消费goroutine消费channel中的数据
ch <- order
}
minId++
if len(orderInfos) < PageSize {
break
}
}
}(channel)
}
}
并发处理
func ConsumeData() {
for i = 0; i < consumerGouroutineSize; i++ {
cwg.Add(1);
ch := channelGroup[i & channelSize]
go func(ch chan *db.Data) {
defer func() {
cwg.Done()
}
for order := range ch {
// do business
}
}
}(channel)
}
}
管道关闭
func WaidAndClose() {
// 等所有生产者goroutine结束
pwg.Wait()
// 关闭管道
for {
//...
}
// 等到消费完毕
cwg.wait()
// 结束任务
return nil
}
标签:goroutine,扫描,minId,shardId,并发,全表,分片,思路,channel 来源: https://www.cnblogs.com/jiefang1874/p/16128670.html