其他分享
首页 > 其他分享> > 通过Z-Order技术加速Hudi大规模数据集分析方案

通过Z-Order技术加速Hudi大规模数据集分析方案

作者:互联网

欢迎微信公众号:ApacheHudi

1. 背景

多维分析是大数据分析的一个典型场景,这种分析一般带有过滤条件。对于此类查询,尤其是在高基字段的过滤查询,理论上只我们对原始数据做合理的布局,结合相关过滤条件,查询引擎可以过滤掉大量不相关数据,只需读取很少部分需要的数据。例如我们在入库之前对相关字段做排序,这样生成的每个文件相关字段的min-max值是不存在交叉的,查询引擎下推过滤条件给数据源结合每个文件的min-max统计信息,即可过滤掉大量不相干数据。 上述技术即我们通常所说的data clustering 和 data skip。直接排序可以在单个字段上产生很好的效果,如果多字段直接排序那么效果会大大折扣的,Z-Order可以较好的解决多字段排序问题。

本文基于Apache Spark 以及 Apache Hudi 结合Z-order技术介绍如何更好的对原始数据做布局, 减少不必要的I/O,进而提升查询速度。具体提案可参考Hudi RFC-28:Support Z-order curve

2. Z-Order介绍

Z-Order是一种可以将多维数据压缩到一维的技术,在时空索引以及图像方面使用较广。Z曲线可以以一条无限长的一维曲线填充任意维度的空间,对于数据库的一条数据来说,我们可以将其多个要排序的字段看作是数据的多个维度,z曲线可以通过一定的规则将多维数据映射到一维数据上,构建z-value 进而可以基于该一维数据进行排序。z-value的映射规则保证了排序后那些在多维维度临近的数据在一维曲线上仍然可以彼此临近。

wiki定义:假设存在一个二维坐标对(x, y),这些坐标对于于一个二维平面上,使用Z排序,我们可以将这些坐标对压缩到一维。

当前在delta lake的商业版本实现了基于Z-Order的data Clustering技术,开源方面Spark/Hive/Presto 均未有对Z-Order的支持。

3. 具体实现

我们接下来分2部分介绍如何在Hudi中使用Z-Order:

  1. z-value的生成和排序
  2. 与Hudi结合

3.1 z-value的生成和排序

这部分是Z-Order策略的核心,这部分逻辑是公用的,同样适用其他框架。

Z-Order的关键在于z-value的映射规则。wiki上给出了基于位交叉的技术,每个维度值的比特位交叉出现在最终的z-value里。例如假设我们想计算二维坐标(x=97, y=214)的z-value,我们可以按如下步骤进行

第一步:将每一维数据用bits表示

x value:01100001
y value:11010110

第二步:从y的最左侧bit开始,我们将x和y按位做交叉,即可得到z 值,如下所示

z-value: 1011011000101001

对于多维数据,我们可以采用同样的方法对每个维度的bit位做按位交叉形成 z-value,一旦我们生成z-values 我们即可用该值做排序,基于z值的排序自然形成z阶曲线对多个参与生成z值的维度都有良好的聚合效果。

上述生成z-value的方法看起来非常好,但在实际生产环境上我们要使用位交叉技术产生z-value 还需解决如下问题:

  1. 上述介绍是基于多个unsigned int类型的递增数据,通过位交叉生成z-value的。实际上的数据类型多种多样,如何处理其他类型数据
  2. 不同类型的维度值转成bit位表示,长度不一致如何处理
  3. 如何选择数据类型合理的保存z-value,以及相应的z值排序策略

针对上述问题,我们采用两种策略生成z值。

3.1.1 基于映射策略的z值生成方法

第一个问题:对不同的数据类型采用不同的转换策略

十进制二进制
00000 0000
10000 0001
20000 0010
1260111 1110
1270111 1111
-1281000 0000
-1271000 0001
-1261000 0010
-21111 1110
-11111 1111

对于这个问题,我们可以直接将二进制的最高位反转,就可以保证转换后的词典顺序和原值相同。如下图

十进制二进制最高位反转最高位反转后十进制
00000 00001000 0000128
10000 00011000 0001129
20000 00101000 0010130
1260111 11101111 1110254
1270111 11111111 1111255
-1281000 00000000 00000
-1271000 00010000 00011
-1261000 00100000 00102
-21111 11100111 1110126
-11111 11110111 1111127

第二个问题:生成的二进制值统一按64位对齐即可

第三个问题:可以用Array[Byte]来保存z值(参考Amazon的DynamoDB 可以限制该数组的长度位1024)。对于 Array[Byte]类型的数据排序,hbase的rowkey 排序器可以直接拿来解决这个问题

基于映射策略的z值生成方法,方便快捷很容易理解,但是有一定缺陷:

  1. 参与生成z-value的字段理论上需要是从0开始的正整数,这样才能生成很好的z曲线。 真实的数据集中 是不可能有这么完美的情况出现的, zorder的效果将会打折扣。比如x 字段取值(0, 1, 2), y字段取值(100, 200, 300), 用x, y生成的z-value只是完整z曲线的一部分,对其做z值排序的效果和直接用x排序的效果是一样的; 再比如x的基数值远远低于y的基数值时采用上述策略排序效果基本和按y值排序是一样的,真实效果还不如先按x排序再按y排序。

  2. String类型的处理, 上述策略对string类型是取前8个字节的参与z值计算, 这将导致精度丢失。 当出现字符串都是相同字符串前缀的情况就无法处理了,比如"https://www.baidu.com" , “https://www.google.com” 这两个字符串前8个字节完全一样, 对这样的数据截取前8个字节参与z值计算没有任何意义。

上述策略出现缺陷的主要原因是数据的分布并不总是那么好导致。有一种简单的方案可以解决上述问题: 对参与z值计算的所有维度值做全局Rank,用Rank值代替其原始值参与到z值计算中,由于Rank值一定是从0开始的正整数,完全符合z值构建条件,较好的解决上述问题。 在实验中我们发现这种用Rank值的方法确实很有效,但是z值生成效率极低,计算引擎做全局Rank的代价是非常高的,基于Rank的方法效率瓶颈在于要做全局Rank计算,那么我们可不可以对原始数据做采样减少数据量,用采样后的数据计算z值呢,答案是肯定的。

/** Generates z-value*/

val newRDD = df.rdd.map { row =>
  val values = zFields.map { case (index, field) =>
    field.dataType match {
      case LongType =>
        ZOrderingUtil.longTo8Byte(row.getLong(index))
      case DoubleType =>
        ZOrderingUtil.doubleTo8Byte(row.getDouble(index))
      case IntegerType =>
        ZOrderingUtil.intTo8Byte(row.getInt(index))
      case FloatType =>
        ZOrderingUtil.doubleTo8Byte(row.getFloat(index).toDouble)
      case StringType =>
        ZOrderingUtil.utf8To8Byte(row.getString(index))
      case DateType =>
        ZOrderingUtil.longTo8Byte(row.getDate(index).getTime)
      case TimestampType =>
        ZOrderingUtil.longTo8Byte(row.getTimestamp(index).getTime)
      case ByteType =>
        ZOrderingUtil.byteTo8Byte(row.getByte(index))
      case ShortType =>
        ZOrderingUtil.intTo8Byte(row.getShort(index).toInt)
      case d: DecimalType =>
        ZOrderingUtil.longTo8Byte(row.getDecimal(index).longValue())
      case _ =>
        null
    }
  }.filter(v => v != null).toArray
  val zValues = ZOrderingUtil.interleaveMulti8Byte(values)
  Row.fromSeq(row.toSeq ++ Seq(zValues))
}.sortBy(x => ZorderingBinarySort(x.getAs[Array[Byte]](fieldNum)))

3.1.2 基于RangeBounds的z-value生成策略

在介绍基于RangeBounds的z-value生成策略之前先看看Spark的排序过程,Spark排序大致分为2步

  1. 对输入数据的key做sampling来估计key的分布,按指定的分区数切分成range并排序。计算出来的rangeBounds是一个长度为numPartition - 1 的数组,该数组里面每个元素表示一个分区内key值的上界/下界。
  2. shuffle write 过程中,每个输入的key应该分到哪个分区内,由第一步计算出来的rangeBounds来确定。每个分区内的数据虽然没有排序,但是注意rangeBounds是有序的因此分区之间宏观上看是有序的,故只需对每个分区内数据做好排序即可保证数据全局有序。

参考Spark的排序过程,我们可以这样做

  1. 对每个参与Z-Order的字段筛选规定个数(类比分区数)的Range并对进行排序,并计算出每个字段的RangeBounds;
  2. 实际映射过程中每个字段映射为该数据所在rangeBounds的中的下标,然后参与z-value的计算。可以看出由于区间下标是从0开始递增的正整数,完全满足z值生成条件;并且String类型的字段映射问题也被一并解决了。基于RangeBounds的z值生成方法,很好的解决了第一种方法所面临的缺陷。由于多了一步采样生成RangeBounds的过程,其效率显然不如第一种方案,我们实现了上述两种z值生成方法以供选择。
/** Generates z-value */

val indexRdd = internalRdd.mapPartitionsInternal { iter =>
  val bounds = boundBroadCast.value
  val origin_Projections = sortingExpressions.map { se =>
    UnsafeProjection.create(Seq(se), outputAttributes)
  }

  iter.map { unsafeRow =>
    val interleaveValues = origin_Projections.zip(origin_lazyGeneratedOrderings).zipWithIndex.map { case ((rowProject, lazyOrdering), index) =>
      val row = rowProject(unsafeRow)
      val decisionBound = new DecisionBound(sampleRdd, lazyOrdering)
      if (row.isNullAt(0)) {
        bounds(index).length + 1
      } else {
        decisionBound.getBound(row, bounds(index).asInstanceOf[Array[InternalRow]])
      }
    }.toArray.map(ZOrderingUtil.toBytes(_))
    val zValues = ZOrderingUtil.interleaveMulti4Byte(interleaveValues)
    val mutablePair = new MutablePair[InternalRow, Array[Byte]]()

    mutablePair.update(unsafeRow, zValues)
  }
}.sortBy(x => ZorderingBinarySort(x._2), numPartitions = fileNum).map(_._1)

3.2 与Hudi结合

与Hudi的结合大致分为两部分

3.2.1 表数据的Z排序重组

这块相对比较简单,借助Hudi内部的Clustering机制结合上述z值的生成排序策略我们可以直接完成Hudi表数据的数据重组,这里不再详细介绍。

3.2.2 收集保存统计信息

这块其实RFC27已经在做了,感觉有点重复工作我们简单介绍下我们的实现,数据完成z重组后,我们需要对重组后的每个文件都收集参与z值计算的各个字段的min/max/nullCount 的统计信息。对于统计信息收集,可以通过读取Parquet文件或者通过SparkSQL收集

/** collect statistic info*/

val sc = df.sparkSession.sparkContext
val serializableConfiguration = new SerializableConfiguration(conf)
val numParallelism = inputFiles.size/3
val previousJobDescription = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
try {
  val description = s"Listing parquet column statistics"
  sc.setJobDescription(description)
  sc.parallelize(inputFiles, numParallelism).mapPartitions { paths =>
    val hadoopConf = serializableConfiguration.value
    paths.map(new Path(_)).flatMap { filePath =>
      val blocks = ParquetFileReader.readFooter(hadoopConf, filePath).getBlocks().asScala
      blocks.flatMap(b => b.getColumns().asScala.
        map(col => (col.getPath().toDotString(),
          FileStats(col.getStatistics().minAsString(), col.getStatistics().maxAsString(), col.getStatistics.getNumNulls.toInt))))
        .groupBy(x => x._1).mapValues(v => v.map(vv => vv._2)).
        mapValues(value => FileStats(value.map(_.minVal).min, value.map(_.maxVal).max, value.map(_.num_nulls).max)).toSeq.
        map(x => ColumnFileStats(filePath.getName(), x._1, x._2.minVal, x._2.maxVal, x._2.num_nulls))
    }.filter(p => cols.contains(p.colName))
  }.collect()
} finally {
  sc.setJobDescription(previousJobDescription)
}
/** collect statistic info*/

val inputFiles = df.inputFiles
val conf = df.sparkSession.sparkContext.hadoopConfiguration
val values = cols.flatMap(c => Seq( min(col(c)).as(c + "_minValue"), max(col(c)).as(c + "_maxValue"), count(c).as(c + "_noNullCount")))
val valueCounts = count("*").as("totalNum")
val projectValues = Seq(col("file")) ++ cols.flatMap(c =>
  Seq(col(c + "_minValue"), col(c + "_maxValue"), expr(s"totalNum - ${c + "_noNullCount"}").as(c + "_num_nulls")))


val result = df.select(input_file_name() as "file", col("*"))
  .groupBy($"file")
  .agg(valueCounts,  values: _*).select(projectValues:_*)
result

之后将这些信息保存在Hudi表里面的hoodie目录下的index目录下,然后供Spark查询使用。

3.2.3 应用到Spark查询

为将统计信息应用Spark查询,需修改HudiIndex的文件过滤逻辑,将DataFilter转成对Index表的过滤,选出候选要读取的文件,返回给查询引擎,具体步骤如下。

  1. 将索引表加载到 IndexDataFrame
  2. 使用原始查询过滤器为 IndexDataFrame 构建数据过滤器
  3. 查询 IndexDataFrame 选择候选文件
  4. 使用这些候选文件来重建 HudiMemoryIndex

通过min/max值和null计数信息为 IndexDataFrame 构建数据过滤器,由于z排序后参与z值计算的各个字段在每个文件里面的min/max值很大概率不交叉,因此对Index表的过滤可以过滤掉大量的文件。

/** convert filter */

def createZindexFilter(condition: Expression): Expression = {

  val minValue = (colName: Seq[String]) =>
    col(UnresolvedAttribute(colName) + "_minValue").expr
  val maxValue = (colName: Seq[String]) =>
    col(UnresolvedAttribute(colName) + "_maxValue").expr
  val num_nulls = (colName: Seq[String]) =>
    col(UnresolvedAttribute(colName) + "_num_nulls").expr

  condition match {
    case EqualTo(attribute: AttributeReference, value: Literal) =>
      val colName = HudiMergeIntoUtils.getTargetColNameParts(attribute)
      And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value))
    case EqualTo(value: Literal, attribute: AttributeReference) =>
      val colName = HudiMergeIntoUtils.getTargetColNameParts(attribute)
      And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value))

    case equalNullSafe @ EqualNullSafe(_: AttributeReference, _ @ Literal(null, _)) =>
      val colName = HudiMergeIntoUtils.getTargetColNameParts(equalNullSafe.left)
      EqualTo(num_nulls(colName), equalNullSafe.right)
.......

4. 测试结果

我们采用databrick的测试样例https://help.aliyun.com/document_detail/168137.html?spm=a2c4g.11186623.6.563.53c258ccmqvYfy 进行了测试

测试数据量和资源使用大小和databrick保持一致。唯一区别是我们只生成了10000个文件,原文是100w个文件。 测试结果表明zorder加速比还说很可观的,另外Z-Order的效果随着文件数的增加会越来越好,我们后续也会在100w文件级别测试。

表名称时间(s)
conn_random_parquet89.3
conn_zorder19.4
conn_zorder_only_ip18.2

标签:case,Hudi,val,colName,value,加速,排序,Order,row
来源: https://blog.csdn.net/weixin_45914070/article/details/118074353