其他分享
首页 > 其他分享> > HBase-Compact-PressureAwareCompactionThroughputController分析

HBase-Compact-PressureAwareCompactionThroughputController分析

作者:互联网

PressureAwareCompactionThroughputController里面的tuneChore定期获取server.getCompactionPressure()
用来做流量的计算


获取本server上所有region的所有store,store.getCompactionPressure最大的一个


store.getCompactionPressure如何计算
  public double getCompactionPressure() {
    return storeEngine.getStoreFileManager().getCompactionPressure();
  }


  @Override
  public double getCompactionPressure() {
    int storefileCount = getStorefileCount();   //文件数量
    int minFilesToCompact = comConf.getMinFilesToCompact();    "hbase.hstore.compaction.min"  旧配置 “hbase.hstore.compactionThreshold” 未配置的为3
    if (storefileCount <= minFilesToCompact) {
      return 0.0;
    }


    //blockingFileCount = "hbase.hstore.blockingStoreFiles" 默认为7
    //这个公式含义:  现在可compact的文件数是否超过需要blocking的数量


    return (double) (storefileCount - minFilesToCompact) / (blockingFileCount - minFilesToCompact);
  }





  private void tune(double compactionPressure) {
    double maxThroughputToSet;


    // compactionPressure > 1.0  //超过了blocking File  不限流
    if (compactionPressure > 1.0) {
      // set to unlimited if some stores already reach the blocking store file count
      maxThroughputToSet = Double.MAX_VALUE;


      //  在空闲时间内
    } else if (offPeakHours.isOffPeakHour()) {
      maxThroughputToSet = maxThroughputOffpeak;
    } 


     //不在空闲时间内
    else {
      // compactionPressure is between 0.0 and 1.0, we use a simple linear formula to
      // calculate the throughput limitation.
      maxThroughputToSet =
          maxThroughputLowerBound + (maxThroughputHigherBound - maxThroughputLowerBound)
              * compactionPressure;
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("compactionPressure is " + compactionPressure + ", tune compaction throughput to "
          + throughputDesc(maxThroughputToSet));
    }
    this.maxThroughput = maxThroughputToSet;
  }



//=====================================================================================================


  @Override
  public long control(String compactionName, long size) throws InterruptedException {
    ActiveCompaction compaction = activeCompactions.get(compactionName);
    compaction.totalSize += size;//当前这个compact的size累加
    long deltaSize = compaction.totalSize - compaction.lastControlSize;  //变化的值

    //controlPerSize = max;  如果变化的值大于controlPerSize直接返回0
    if (deltaSize < controlPerSize) {
      return 0;
    }
    long now = EnvironmentEdgeManager.currentTimeMillis();

    //maxThroughputPerCompaction(每个compact 流量) = 机器compact流量最大值 / 当前活跃的Compact任务数量
    double maxThroughputPerCompaction = this.maxThroughput / activeCompactions.size();

    // 最小允许时间 = 本次需要的流量 / 每个compact可用流量 *1000 (    单位 ms)
    long minTimeAllowed = (long) (deltaSize / maxThroughputPerCompaction * 1000); // ms

    // 从上次触发control到这次的时间
    long elapsedTime = now - compaction.lastControlTime;
    compaction.lastControlSize = compaction.totalSize;

    //如果 已经经过的时间 大于 最小控制时间  (说明时间已经充足了,直接返回0)
    if (elapsedTime >= minTimeAllowed) {
      compaction.lastControlTime = EnvironmentEdgeManager.currentTimeMillis();
      return 0;
    }


    // 如果经过的时间,小于等于最小允许时间 说明:too fast
    // sleepTime  可允许的时间-已经等待的时间
    long sleepTime = minTimeAllowed - elapsedTime;
    if (LOG.isDebugEnabled()) {
      // do not log too much
      if (now - compaction.lastLogTime > 60L * 1000) {
        LOG.debug(compactionName + " sleep " + sleepTime + " ms because current throughput is "
            + throughputDesc(deltaSize, elapsedTime) + ", max allowed is "
            + throughputDesc(maxThroughputPerCompaction) + ", already slept "
            + compaction.numberOfSleeps + " time(s) and total slept time is "
            + compaction.totalSleepTime + " ms till now.");
        compaction.lastLogTime = now;
      }
    }
    Thread.sleep(sleepTime);
    compaction.numberOfSleeps++;
    compaction.totalSleepTime += sleepTime;
    compaction.lastControlTime = EnvironmentEdgeManager.currentTimeMillis();
    return sleepTime;
  }



















 

gloria_y 发布了52 篇原创文章 · 获赞 4 · 访问量 5万+ 私信 关注

标签:Compact,PressureAwareCompactionThroughputController,getCompactionPressure,sleepT
来源: https://blog.csdn.net/Gloria_y/article/details/104083450