其他分享
首页 > 其他分享> > 两种全表扫描的思路

两种全表扫描的思路

作者:互联网

日常的工作中,可能需要对分片表进行全表扫描,这里介绍两种并发全表扫描的方法:

思路1:分片分页读取+并发请求

两个for循环,外层for循环遍历每个分片,内层for循环并发处理这些数据。整个处理过程可分为数据获取和并发请求两部分,两部分串行执行,先获取数据,再并发处理这些数据。

数据获取

a、外层循环遍历每个分片

b、每个分片内分页读取数据

// 初试化分片为0
   minId := int64(0)
   // 遍历分片
   for shardId := int64(0); shardId < info.getShardingCount(); {
      // 获取数据
      data, err := info.PageQueryData(shardId, PageSize, minId)
      
      // 并发处理 此处省略
      // ...
     
      // 如果获取到的数据长度小于每页数量,则表明数据读取完,分片数+1,否则继续读取该分片数据
      if len(data) < PageSize {
         shardId++
         minId = 0
      }
   }

并发处理

a、设置最大并发goroutine数量n

b、通过buffersize = n的channel控制并发goroutine数量:每开启一个goroutine向channel中插入一条数据,每个goroutine结束后从channel取出一条数据

c、通过sync.WaitGroup等待所有goroutine执行完后才读取下一批数据

   // 初试化分片为0
   minId := int64(0)
   // 遍历分片
   for shardId := int64(0); shardId < info.getShardingCount(); {
      // 获取数据
      data, err := info.BatchGetData(shardId, PageSize, minId)
      
      // 并发处理
      for _, item := range data {
         info.waitGroup.Add(1)
         // 向channel插入数据 
         info.concurrencyCtlChannel <- 1
         if item.Id > minId {
            minId = item.Id
         }
         go func(ctx context.Context, data *Data) {
            defer func() {
               if err := recover(); err != nil {
                  logs.CtxError(ctx, "Task: task execute error. err: %v", err)
               }
               // 运行完后从channel读取数据
               <-info.concurrencyCtlChannel
               info.waitGroup.Done()
            }()
            // 业务处理 此处省略
            // ...
         }(ctx, item)
      }
      // 等待data全部处理完
      info.waitGroup.Wait()
     
      // 如果获取到的数据长度小于每页数量,则表明数据读取完,分片数+1,否则继续读取该分片数据
      if len(data) < PageSize {
         shardId++
         minId = 0
      }
   }

 思路2:并发读取+并发处理(生产者-消费者模型)

a、创建一组生产者goroutine和消费者goroutine,以及一组传递消息的channel,不同的生产者goroutine从不同的分片中分页获取数据,向channel中插入数据,消费者goroutine从channel中获取数据进行消费。

b、另外通过一个channel控制生产者goroutine的数量,用一个输入参数控制消费者goroutine的数量。

c、使用sync.WaitGroup等待生产者生产完数据后关闭管道,等待消费者消费完数据后结束任务。

并发读取

func ProduceData() {
    for i = minShardingKey; i <= maxShardingKey; i++ {
        channel := channelGroup[i % channelSize]
        pwg.Add(1)
        // 控制produce goroutine数量
        produceChannel <- 1
        go func(ch chan *db.Data) {
            defer func() {
                t.pwg.Done()
                <- produceChannel
            }
             minId := int64(1)
             for {
                // 分页从数据库读取数据
                data, err := db.GetData(shardingKey, minId, PageSize)
                // 向channel生产数据
                for _, order := range data {
                   if order.Id > minId {
                      minId = order.Id
                   }
                   // 如果channel已满则阻塞在此等待消费goroutine消费channel中的数据
                   ch <- order
             }
             minId++
             if len(orderInfos) < PageSize {
                 break
             }
         }
      }(channel)
    }
}

 并发处理

func ConsumeData() {
    for i = 0; i < consumerGouroutineSize; i++ {
        cwg.Add(1);
        ch := channelGroup[i & channelSize]
        go func(ch chan *db.Data) {
            defer func() {
                cwg.Done()
            }
            for order := range ch {
                // do business
            }
         }
      }(channel)
    }
}

管道关闭

func WaidAndClose() {
    // 等所有生产者goroutine结束
    pwg.Wait()
    // 关闭管道 
    for  {
        //...
    }
    // 等到消费完毕
    cwg.wait()
    
    // 结束任务
    return nil
}

标签:goroutine,扫描,minId,shardId,并发,全表,分片,思路,channel
来源: https://www.cnblogs.com/jiefang1874/p/16128670.html