HUDI preCombinedField 总结(二)-源码分析
作者:互联网
前言
在上一篇博客HUDI preCombinedField 总结中已经对preCombinedField进行总结过一次了,由于当时对源码理解还不够深入,导致分析的不全面,现在对源码有了进一步的理解,所以再进行总结补充一下。
历史比较值
上面总结中:
DF:无论新记录的ts值是否大于历史记录的ts值,都会覆盖写,直接更新。
SQL:写数据时,ts值大于等于历史ts值,才会更新,小于历史值则不更新。
这里解释一下原因,首先Spark SQL PAYLOAD_CLASS_NAME 默认值为ExpressionPayload,而ExpressionPayload继承了DefaultHoodieRecordPayload
class ExpressionPayload(record: GenericRecord,
orderingVal: Comparable[_])
extends DefaultHoodieRecordPayload(record, orderingVal) {
DefaultHoodieRecordPayload 里的needUpdatingPersistedRecord实现了历史值进行比较,具体实现,后面会进行分析
而 Spark DF在hudi0.9.0版本 PAYLOAD_CLASS_NAME的默认值为OverwriteWithLatestAvroPayload,它是DefaultHoodieRecordPayload的父类并没有实现和历史值进行比较
历史值比较实现
对源码进行简单的分析,首先说明历史比较值的配置项为:
HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY = "hoodie.payload.ordering.field"
而它的默认值为ts,所以ordering_field和preCombineField并不一样,但是因为默认值一样而且实现都在PAYLOAD_CLASS里,所以给人的感觉是一样,故放在一起进行总结
HoodieMergeHandle
hudi 在 upsert进行小文件合并时,会走到HoodieMergeHandled的write方法:
/**
* Go through an old record. Here if we detect a newer version shows up, we write the new one to the file.
*/
public void write(GenericRecord oldRecord) {
// 历史key值
String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt);
boolean copyOldRecord = true;
if (keyToNewRecords.containsKey(key)) { //如果新记录的key值包含旧值,则进行合并逻辑
// If we have duplicate records that we are updating, then the hoodie record will be deflated after
// writing the first record. So make a copy of the record to be merged
HoodieRecord<T> hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key));
try {
// 这里调用了 PAYLOAD_CLASS 的 combineAndGetUpdateValue方法
Option<IndexedRecord> combinedAvroRecord =
hoodieRecord.getData().combineAndGetUpdateValue(oldRecord,
useWriterSchema ? tableSchemaWithMetaFields : tableSchema,
config.getPayloadConfig().getProps());
if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) {
// If it is an IGNORE_RECORD, just copy the old record, and do not update the new record.
copyOldRecord = true;
} else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedAvroRecord)) {
/*
* ONLY WHEN 1) we have an update for this key AND 2) We are able to successfully
* write the the combined new
* value
*
* We no longer need to copy the old record over.
*/
copyOldRecord = false;
}
writtenRecordKeys.add(key);
} catch (Exception e) {
throw new HoodieUpsertException("Failed to combine/merge new record with old value in storage, for new record {"
+ keyToNewRecords.get(key) + "}, old value {" + oldRecord + "}", e);
}
}
if (copyOldRecord) {
// this should work as it is, since this is an existing record
try {
fileWriter.writeAvro(key, oldRecord);
} catch (IOException | RuntimeException e) {
String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s",
key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
LOG.debug("Old record is " + oldRecord);
throw new HoodieUpsertException(errMsg, e);
}
recordsWritten++;
}
}
combineAndGetUpdateValue方法
看一下 DefaultHoodieRecordPayload的combineAndGetUpdateValue:
@Override
/**
* currentValue 当前值,即历史记录值
* Option<IndexedRecord> combinedAvroRecord =
* hoodieRecord.getData().combineAndGetUpdateValue(oldRecord,
* useWriterSchema ? tableSchemaWithMetaFields : tableSchema,
* config.getPayloadConfig().getProps());
*/
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException {
// recordBytes 为新数据的字节值
if (recordBytes.length == 0) {
return Option.empty();
}
// 将recordBytes转化为Avro格式的GenericRecord
GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
// Null check is needed here to support schema evolution. The record in storage may be from old schema where
// the new ordering column might not be present and hence returns null.
// 如果不需要历史值,则返回历史记录值
if (!needUpdatingPersistedRecord(currentValue, incomingRecord, properties)) {
return Option.of(currentValue);
}
/*
* We reached a point where the value is disk is older than the incoming record.
*/
eventTime = updateEventTime(incomingRecord, properties);
/*
* Now check if the incoming record is a delete record.
*/
return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord);
}
关于recordBytes的赋值,在父类BaseAvroPayload,我们写数据时需要先构造GenericRecord record,然后将record作为参数传给PayLoad,最后构造构造List<HoodieRecord>,调用HoodieJavaWriteClient.upsert(List<HoodieRecord> records,
String instantTime)
public BaseAvroPayload(GenericRecord record, Comparable orderingVal) {
this.recordBytes = record != null ? HoodieAvroUtils.avroToBytes(record) : new byte[0];
this.orderingVal = orderingVal;
if (orderingVal == null) {
throw new HoodieException("Ordering value is null for record: " + record);
}
}
needUpdatingPersistedRecord
和历史值的比较就在这里:
protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
IndexedRecord incomingRecord, Properties properties) {
/*
* Combining strategy here returns currentValue on disk if incoming record is older.
* The incoming record can be either a delete (sent as an upsert with _hoodie_is_deleted set to true)
* or an insert/update record. In any case, if it is older than the record in disk, the currentValue
* in disk is returned (to be rewritten with new commit time).
*
* NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation type do not hit this code path
* and need to be dealt with separately.
*/
// 历史ts值
Object persistedOrderingVal = getNestedFieldVal((GenericRecord) currentValue,
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true);
// 新数据的ts值
Comparable incomingOrderingVal = (Comparable) getNestedFieldVal((GenericRecord) incomingRecord,
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), false);
// 如果历史值为null或者历史值小于新值,则返回true,代表要覆盖历史值更新,反之不更新
return persistedOrderingVal == null || ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) <= 0;
}
PAYLOAD_ORDERING_FIELD_PROP_KEY默认值
可以看到在上面HoodieMergeHandle中传的properties参数为config.getPayloadConfig().getProps()
getPayloadConfig返回HoodiePayloadConfig,而在HoodiePayloadConfig定义了PAYLOAD_ORDERING_FIELD_PROP_KEY的默认值为ts
public HoodiePayloadConfig getPayloadConfig() {
return hoodiePayloadConfig;
}
public class HoodiePayloadConfig extends HoodieConfig {
public static final ConfigProperty<String> ORDERING_FIELD = ConfigProperty
.key(PAYLOAD_ORDERING_FIELD_PROP_KEY)
.defaultValue("ts")
.withDocumentation("Table column/field name to order records that have the same key, before "
+ "merging and writing to storage.");
预合并实现
首先说明,预合并实现方法为类 OverwriteWithLatestAvroPayload.preCombine
public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {
public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
}
public OverwriteWithLatestAvroPayload(Option<GenericRecord> record) {
this(record.isPresent() ? record.get() : null, 0); // natural order
}
@Override
public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue) {
if (oldValue.recordBytes.length == 0) {
// use natural order for delete record
return this;
}
// 如果旧值的orderingVal大于orderingVal,发返回旧值,否则返回当前新值,即返回较大的record
if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
// pick the payload with greatest ordering value
return oldValue;
} else {
return this;
}
}
所以无论是Spark SQL 还是 Spark DF都默认实现了预合并ExpressionPayload、DefaultHoodieRecordPayload都继承了(extends
)OverwriteWithLatestAvroPayload,所以用这三个payload都可以实现预合并,关键看怎么构造paylod
构造Paylod
根据上面的代码,我们可以发现OverwriteWithLatestAvroPayload有两个构造函数,一个参数和两个参数,其中一个参数的并不能实现预合并,因为预合并方法中需要orderingVal比较,所以要用两个参数的构造函数构造OverwriteWithLatestAvroPayload,其中orderingVal 为 preCombineField对应的值,record为一行记录值。而无论是Spark SQL还是Spark DF,最终都会调用HoodieSparkSqlWriter.write
,构造paylod就是在这个write方法里实现的。
// Convert to RDD[HoodieRecord]
// 首先将df转为RDD[HoodieRecord]
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
org.apache.hudi.common.util.Option.of(schema))
// 判断是否需要预合并
val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
operation.equals(WriteOperationType.UPSERT) ||
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
val hoodieAllIncomingRecords = genericRecords.map(gr => {
val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns)
val hoodieRecord = if (shouldCombine) { // 如果需要预合并
// 从record中取出PRECOMBINE_FIELD对应的值,如果值不存在,则抛出异常,因为预合并的字段不允许存在空值
val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false)
.asInstanceOf[Comparable[_]]
然后通过反射的方法,构造PAYLOAD_CLASS_NAME对应的paylod
DataSourceUtils.createHoodieRecord(processedRecord,
orderingVal, keyGenerator.getKey(gr),
hoodieConfig.getString(PAYLOAD_CLASS_NAME))
} else {
// 如果不需要预合并,也通过反射构造paylod,但是不需要orderingVal参数
DataSourceUtils.createHoodieRecord(processedRecord, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))
}
hoodieRecord
}).toJavaRDD()
通过上面源码的注释中可以看到,如果需要进行预合并的话,则首先取出record中对应的PRECOMBINE_FIELD值orderingVal,然后构造payload,即
new OverwriteWithLatestAvroPayload(record, orderingVal)
这里就构造好了payload,那么最终是在哪里实现的预合并呢?
调用preCombine
这里以cow表的upsert为例,即HoodieJavaCopyOnWriteTable.upsert
// HoodieJavaCopyOnWriteTable
@Override
public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext context,
String instantTime,
List<HoodieRecord<T>> records) {
return new JavaUpsertCommitActionExecutor<>(context, config,
this, instantTime, records).execute();
}
// JavaUpsertCommitActionExecutor
@Override
public HoodieWriteMetadata<List<WriteStatus>> execute() {
return JavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table,
config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, true);
}
// AbstractWriteHelper
public HoodieWriteMetadata<O> write(String instantTime,
I inputRecords,
HoodieEngineContext context,
HoodieTable<T, I, K, O> table,
boolean shouldCombine,
int shuffleParallelism,
BaseCommitActionExecutor<T, I, K, O, R> executor,
boolean performTagging) {
try {
// De-dupe/merge if needed
I dedupedRecords =
combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);
Instant lookupBegin = Instant.now();
I taggedRecords = dedupedRecords;
if (performTagging) {
// perform index loop up to get existing location of records
taggedRecords = tag(dedupedRecords, context, table);
}
Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());
HoodieWriteMetadata<O> result = executor.execute(taggedRecords);
result.setIndexLookupDuration(indexLookupDuration);
return result;
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
throw (HoodieUpsertException) e;
}
throw new HoodieUpsertException("Failed to upsert for commit time " + instantTime, e);
}
}
public I combineOnCondition(
boolean condition, I records, int parallelism, HoodieTable<T, I, K, O> table) {
return condition ? deduplicateRecords(records, table, parallelism) : records;
}
/**
* Deduplicate Hoodie records, using the given deduplication function.
*
* @param records hoodieRecords to deduplicate
* @param parallelism parallelism or partitions to be used while reducing/deduplicating
* @return Collection of HoodieRecord already be deduplicated
*/
public I deduplicateRecords(
I records, HoodieTable<T, I, K, O> table, int parallelism) {
return deduplicateRecords(records, table.getIndex(), parallelism);
}
// SparkWriteHelper
@Override
public JavaRDD<HoodieRecord<T>> deduplicateRecords(
JavaRDD<HoodieRecord<T>> records, HoodieIndex<T, ?, ?, ?> index, int parallelism) {
boolean isIndexingGlobal = index.isGlobal();
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their partitionPath
// 获取record的key值
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
// 返回 (key,record)
return new Tuple2<>(key, record);
}).reduceByKey((rec1, rec2) -> {
@SuppressWarnings("unchecked")
// key值相同的record 通过 preCombine函数,返回 preCombineField值较大那个
T reducedData = (T) rec2.getData().preCombine(rec1.getData());
HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
return new HoodieRecord<T>(reducedKey, reducedData);
}, parallelism).map(Tuple2::_2);
}
这样就实现了预合并的功能
修改历史比较值
最后说一下历史比较值是怎么修改的,其实Spark SQL 和 Spark DF不用特意修改它的值,因为默认和preCombineField值是同步修改的,看一下程序怎么同步修改的。
无论是是SQL还是DF最终都会调用HoodieSparkSqlWriter.write
// Create a HoodieWriteClient & issue the delete.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path, tblName,
mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath,
String tblName, Map<String, String> parameters) {
return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), createHoodieConfig(schemaStr, basePath, tblName, parameters));
}
public static HoodieWriteConfig createHoodieConfig(String schemaStr, String basePath,
String tblName, Map<String, String> parameters) {
boolean asyncCompact = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key()));
boolean inlineCompact = !asyncCompact && parameters.get(DataSourceWriteOptions.TABLE_TYPE().key())
.equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL());
boolean asyncClusteringEnabled = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key()));
boolean inlineClusteringEnabled = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INLINE_CLUSTERING_ENABLE().key()));
// insert/bulk-insert combining to be true, if filtering for duplicates
boolean combineInserts = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INSERT_DROP_DUPS().key()));
HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
.withPath(basePath).withAutoCommit(false).combineInput(combineInserts, true);
if (schemaStr != null) {
builder = builder.withSchema(schemaStr);
}
return builder.forTable(tblName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key()))
.withInlineCompaction(inlineCompact).build())
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withInlineClustering(inlineClusteringEnabled)
.withAsyncClustering(asyncClusteringEnabled).build())
// 在这里设置里OrderingField 的值等于 PRECOMBINE_FIELD,所以默认和PRECOMBINE_FIELD是同步修改的
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(parameters.get(DataSourceWriteOptions.PRECOMBINE_FIELD().key()))
.build())
// override above with Hoodie configs specified as options.
.withProps(parameters).build();
}
如果确实想修改默认值,即和PRECOMBINE_FIELD不一样,
那么sql:
set hoodie.payload.ordering.field=ts;
DF:
.option("hoodie.payload.ordering.field", "ts")
或
.option(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts")
标签:HUDI,preCombinedField,orderingVal,return,record,源码,key,new,public 来源: https://blog.csdn.net/dkl12/article/details/123070348