编程语言
首页 > 编程语言> > HUDI preCombinedField 总结(二)-源码分析

HUDI preCombinedField 总结(二)-源码分析

作者:互联网

前言

在上一篇博客HUDI preCombinedField 总结中已经对preCombinedField进行总结过一次了,由于当时对源码理解还不够深入,导致分析的不全面,现在对源码有了进一步的理解,所以再进行总结补充一下。

历史比较值

上面总结中:

DF:无论新记录的ts值是否大于历史记录的ts值,都会覆盖写,直接更新。
SQL:写数据时,ts值大于等于历史ts值,才会更新,小于历史值则不更新。

这里解释一下原因,首先Spark SQL PAYLOAD_CLASS_NAME 默认值为ExpressionPayload,而ExpressionPayload继承了DefaultHoodieRecordPayload

class ExpressionPayload(record: GenericRecord,
                        orderingVal: Comparable[_])
  extends DefaultHoodieRecordPayload(record, orderingVal) {

DefaultHoodieRecordPayload 里的needUpdatingPersistedRecord实现了历史值进行比较,具体实现,后面会进行分析

而 Spark DF在hudi0.9.0版本 PAYLOAD_CLASS_NAME的默认值为OverwriteWithLatestAvroPayload,它是DefaultHoodieRecordPayload的父类并没有实现和历史值进行比较

历史值比较实现

对源码进行简单的分析,首先说明历史比较值的配置项为:

 HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY = "hoodie.payload.ordering.field"

而它的默认值为ts,所以ordering_field和preCombineField并不一样,但是因为默认值一样而且实现都在PAYLOAD_CLASS里,所以给人的感觉是一样,故放在一起进行总结

HoodieMergeHandle

hudi 在 upsert进行小文件合并时,会走到HoodieMergeHandled的write方法:

/**
   * Go through an old record. Here if we detect a newer version shows up, we write the new one to the file.
   */
  public void write(GenericRecord oldRecord) {
    // 历史key值  
    String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt);
    boolean copyOldRecord = true;
    if (keyToNewRecords.containsKey(key)) { //如果新记录的key值包含旧值,则进行合并逻辑
      // If we have duplicate records that we are updating, then the hoodie record will be deflated after
      // writing the first record. So make a copy of the record to be merged
      HoodieRecord<T> hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key));
      try {
        // 这里调用了 PAYLOAD_CLASS 的 combineAndGetUpdateValue方法
        Option<IndexedRecord> combinedAvroRecord =
            hoodieRecord.getData().combineAndGetUpdateValue(oldRecord,
              useWriterSchema ? tableSchemaWithMetaFields : tableSchema,
                config.getPayloadConfig().getProps());

        if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) {
          // If it is an IGNORE_RECORD, just copy the old record, and do not update the new record.
          copyOldRecord = true;
        } else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedAvroRecord)) {
          /*
           * ONLY WHEN 1) we have an update for this key AND 2) We are able to successfully
           * write the the combined new
           * value
           *
           * We no longer need to copy the old record over.
           */
          copyOldRecord = false;
        }
        writtenRecordKeys.add(key);
      } catch (Exception e) {
        throw new HoodieUpsertException("Failed to combine/merge new record with old value in storage, for new record {"
            + keyToNewRecords.get(key) + "}, old value {" + oldRecord + "}", e);
      }
    }

    if (copyOldRecord) {
      // this should work as it is, since this is an existing record
      try {
        fileWriter.writeAvro(key, oldRecord);
      } catch (IOException | RuntimeException e) {
        String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s",
                key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
        LOG.debug("Old record is " + oldRecord);
        throw new HoodieUpsertException(errMsg, e);
      }
      recordsWritten++;
    }
  }

combineAndGetUpdateValue方法

看一下 DefaultHoodieRecordPayload的combineAndGetUpdateValue:

 
  @Override
  /**
   * currentValue 当前值,即历史记录值
   * Option<IndexedRecord> combinedAvroRecord =
   *             hoodieRecord.getData().combineAndGetUpdateValue(oldRecord,
   *               useWriterSchema ? tableSchemaWithMetaFields : tableSchema,
   *                 config.getPayloadConfig().getProps());
   */
  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException {
    // recordBytes 为新数据的字节值  
    if (recordBytes.length == 0) {
      return Option.empty();
    }
    // 将recordBytes转化为Avro格式的GenericRecord
    GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);

    // Null check is needed here to support schema evolution. The record in storage may be from old schema where
    // the new ordering column might not be present and hence returns null.
    // 如果不需要历史值,则返回历史记录值
    if (!needUpdatingPersistedRecord(currentValue, incomingRecord, properties)) {
      return Option.of(currentValue);
    }

    /*
     * We reached a point where the value is disk is older than the incoming record.
     */
    eventTime = updateEventTime(incomingRecord, properties);

    /*
     * Now check if the incoming record is a delete record.
     */
    return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord);
  }

关于recordBytes的赋值,在父类BaseAvroPayload,我们写数据时需要先构造GenericRecord record,然后将record作为参数传给PayLoad,最后构造构造List<HoodieRecord>,调用HoodieJavaWriteClient.upsert(List<HoodieRecord> records,
String instantTime)

  public BaseAvroPayload(GenericRecord record, Comparable orderingVal) {
    this.recordBytes = record != null ? HoodieAvroUtils.avroToBytes(record) : new byte[0];
    this.orderingVal = orderingVal;
    if (orderingVal == null) {
      throw new HoodieException("Ordering value is null for record: " + record);
    }
  }

needUpdatingPersistedRecord

和历史值的比较就在这里:

  protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
                                                IndexedRecord incomingRecord, Properties properties) {
    /*
     * Combining strategy here returns currentValue on disk if incoming record is older.
     * The incoming record can be either a delete (sent as an upsert with _hoodie_is_deleted set to true)
     * or an insert/update record. In any case, if it is older than the record in disk, the currentValue
     * in disk is returned (to be rewritten with new commit time).
     *
     * NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation type do not hit this code path
     * and need to be dealt with separately.
     */
    // 历史ts值
    Object persistedOrderingVal = getNestedFieldVal((GenericRecord) currentValue,
        properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true);
    // 新数据的ts值    
    Comparable incomingOrderingVal = (Comparable) getNestedFieldVal((GenericRecord) incomingRecord,
        properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), false);
    // 如果历史值为null或者历史值小于新值,则返回true,代表要覆盖历史值更新,反之不更新    
    return persistedOrderingVal == null || ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) <= 0;
  }

PAYLOAD_ORDERING_FIELD_PROP_KEY默认值

可以看到在上面HoodieMergeHandle中传的properties参数为config.getPayloadConfig().getProps()
getPayloadConfig返回HoodiePayloadConfig,而在HoodiePayloadConfig定义了PAYLOAD_ORDERING_FIELD_PROP_KEY的默认值为ts

  public HoodiePayloadConfig getPayloadConfig() {
    return hoodiePayloadConfig;
  }

 public class HoodiePayloadConfig extends HoodieConfig {

  public static final ConfigProperty<String> ORDERING_FIELD = ConfigProperty
      .key(PAYLOAD_ORDERING_FIELD_PROP_KEY)
      .defaultValue("ts")
      .withDocumentation("Table column/field name to order records that have the same key, before "
          + "merging and writing to storage."); 

预合并实现

首先说明,预合并实现方法为类 OverwriteWithLatestAvroPayload.preCombine

public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
    implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {

  public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
    super(record, orderingVal);
  }

  public OverwriteWithLatestAvroPayload(Option<GenericRecord> record) {
    this(record.isPresent() ? record.get() : null, 0); // natural order
  }

  @Override
  public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue) {
    if (oldValue.recordBytes.length == 0) {
      // use natural order for delete record
      return this;
    }
    // 如果旧值的orderingVal大于orderingVal,发返回旧值,否则返回当前新值,即返回较大的record
    if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
      // pick the payload with greatest ordering value
      return oldValue;
    } else {
      return this;
    }
  }

所以无论是Spark SQL 还是 Spark DF都默认实现了预合并ExpressionPayload、DefaultHoodieRecordPayload都继承了(extends)OverwriteWithLatestAvroPayload,所以用这三个payload都可以实现预合并,关键看怎么构造paylod

构造Paylod

根据上面的代码,我们可以发现OverwriteWithLatestAvroPayload有两个构造函数,一个参数和两个参数,其中一个参数的并不能实现预合并,因为预合并方法中需要orderingVal比较,所以要用两个参数的构造函数构造OverwriteWithLatestAvroPayload,其中orderingVal 为 preCombineField对应的值,record为一行记录值。而无论是Spark SQL还是Spark DF,最终都会调用HoodieSparkSqlWriter.write,构造paylod就是在这个write方法里实现的。

// Convert to RDD[HoodieRecord]
// 首先将df转为RDD[HoodieRecord]
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
  org.apache.hudi.common.util.Option.of(schema))
// 判断是否需要预合并  
val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
  operation.equals(WriteOperationType.UPSERT) ||
  parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
    HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
val hoodieAllIncomingRecords = genericRecords.map(gr => {
  val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns)
  val hoodieRecord = if (shouldCombine) { // 如果需要预合并
    // 从record中取出PRECOMBINE_FIELD对应的值,如果值不存在,则抛出异常,因为预合并的字段不允许存在空值
    val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false)
      .asInstanceOf[Comparable[_]]
    然后通过反射的方法,构造PAYLOAD_CLASS_NAME对应的paylod  
    DataSourceUtils.createHoodieRecord(processedRecord,
      orderingVal, keyGenerator.getKey(gr),
      hoodieConfig.getString(PAYLOAD_CLASS_NAME))
  } else {
    // 如果不需要预合并,也通过反射构造paylod,但是不需要orderingVal参数  
    DataSourceUtils.createHoodieRecord(processedRecord, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))
  }
  hoodieRecord
}).toJavaRDD()

通过上面源码的注释中可以看到,如果需要进行预合并的话,则首先取出record中对应的PRECOMBINE_FIELD值orderingVal,然后构造payload,即

new OverwriteWithLatestAvroPayload(record, orderingVal)

这里就构造好了payload,那么最终是在哪里实现的预合并呢?

调用preCombine

这里以cow表的upsert为例,即HoodieJavaCopyOnWriteTable.upsert

// HoodieJavaCopyOnWriteTable
  @Override
  public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext context,
                                                       String instantTime,
                                                       List<HoodieRecord<T>> records) {
    return new JavaUpsertCommitActionExecutor<>(context, config,
        this, instantTime, records).execute();
  }

// JavaUpsertCommitActionExecutor
   @Override
  public HoodieWriteMetadata<List<WriteStatus>> execute() {
    return JavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table,
        config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, true);
  }

// AbstractWriteHelper
public HoodieWriteMetadata<O> write(String instantTime,
                                      I inputRecords,
                                      HoodieEngineContext context,
                                      HoodieTable<T, I, K, O> table,
                                      boolean shouldCombine,
                                      int shuffleParallelism,
                                      BaseCommitActionExecutor<T, I, K, O, R> executor,
                                      boolean performTagging) {
    try {
      // De-dupe/merge if needed
      I dedupedRecords =
          combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);

      Instant lookupBegin = Instant.now();
      I taggedRecords = dedupedRecords;
      if (performTagging) {
        // perform index loop up to get existing location of records
        taggedRecords = tag(dedupedRecords, context, table);
      }
      Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());

      HoodieWriteMetadata<O> result = executor.execute(taggedRecords);
      result.setIndexLookupDuration(indexLookupDuration);
      return result;
    } catch (Throwable e) {
      if (e instanceof HoodieUpsertException) {
        throw (HoodieUpsertException) e;
      }
      throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
    }
  }

    public I combineOnCondition(
      boolean condition, I records, int parallelism, HoodieTable<T, I, K, O> table) {
    return condition ? deduplicateRecords(records, table, parallelism) : records;
  }


  /**
   * Deduplicate Hoodie records, using the given deduplication function.
   *
   * @param records     hoodieRecords to deduplicate
   * @param parallelism parallelism or partitions to be used while reducing/deduplicating
   * @return Collection of HoodieRecord already be deduplicated
   */
  public I deduplicateRecords(
      I records, HoodieTable<T, I, K, O> table, int parallelism) {
    return deduplicateRecords(records, table.getIndex(), parallelism);
  }


// SparkWriteHelper  
  @Override
  public JavaRDD<HoodieRecord<T>> deduplicateRecords(
      JavaRDD<HoodieRecord<T>> records, HoodieIndex<T, ?, ?, ?> index, int parallelism) {
    boolean isIndexingGlobal = index.isGlobal();
    return records.mapToPair(record -> {
      HoodieKey hoodieKey = record.getKey();
      // If index used is global, then records are expected to differ in their partitionPath
      // 获取record的key值
      Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
      // 返回 (key,record) 
      return new Tuple2<>(key, record);
    }).reduceByKey((rec1, rec2) -> {
      @SuppressWarnings("unchecked")
      // key值相同的record 通过 preCombine函数,返回 preCombineField值较大那个
      T reducedData = (T) rec2.getData().preCombine(rec1.getData());
      HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();

      return new HoodieRecord<T>(reducedKey, reducedData);
    }, parallelism).map(Tuple2::_2);
  }     

这样就实现了预合并的功能

修改历史比较值

最后说一下历史比较值是怎么修改的,其实Spark SQL 和 Spark DF不用特意修改它的值,因为默认和preCombineField值是同步修改的,看一下程序怎么同步修改的。
无论是是SQL还是DF最终都会调用HoodieSparkSqlWriter.write

// Create a HoodieWriteClient & issue the delete.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
  null, path, tblName,
  mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
  .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]

  public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath,
                                                       String tblName, Map<String, String> parameters) {
    return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), createHoodieConfig(schemaStr, basePath, tblName, parameters));
  }

  public static HoodieWriteConfig createHoodieConfig(String schemaStr, String basePath,
      String tblName, Map<String, String> parameters) {
    boolean asyncCompact = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key()));
    boolean inlineCompact = !asyncCompact && parameters.get(DataSourceWriteOptions.TABLE_TYPE().key())
        .equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL());
    boolean asyncClusteringEnabled = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key()));
    boolean inlineClusteringEnabled = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INLINE_CLUSTERING_ENABLE().key()));
    // insert/bulk-insert combining to be true, if filtering for duplicates
    boolean combineInserts = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INSERT_DROP_DUPS().key()));
    HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
        .withPath(basePath).withAutoCommit(false).combineInput(combineInserts, true);
    if (schemaStr != null) {
      builder = builder.withSchema(schemaStr);
    }

    return builder.forTable(tblName)
        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build())
        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
            .withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key()))
            .withInlineCompaction(inlineCompact).build())
        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
            .withInlineClustering(inlineClusteringEnabled)
            .withAsyncClustering(asyncClusteringEnabled).build())
         // 在这里设置里OrderingField 的值等于 PRECOMBINE_FIELD,所以默认和PRECOMBINE_FIELD是同步修改的
        .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(parameters.get(DataSourceWriteOptions.PRECOMBINE_FIELD().key()))
            .build())
        // override above with Hoodie configs specified as options.
        .withProps(parameters).build();
  }  

如果确实想修改默认值,即和PRECOMBINE_FIELD不一样,

那么sql:

set hoodie.payload.ordering.field=ts;

DF:

.option("hoodie.payload.ordering.field", "ts")
或
.option(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts")

标签:HUDI,preCombinedField,orderingVal,return,record,源码,key,new,public
来源: https://blog.csdn.net/dkl12/article/details/123070348