Skip to content

Commit

Permalink
Disabling auto commit in mdt
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Nov 4, 2024
1 parent d1c5423 commit 3c5bbc8
Show file tree
Hide file tree
Showing 15 changed files with 292 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3123,7 +3123,7 @@ public Builder withTTLConfig(HoodieTTLConfig ttlConfig) {
}

public Builder withAutoCommit(boolean autoCommit) {
// writeConfig.setValue(AUTO_COMMIT_ENABLE, String.valueOf(autoCommit));
writeConfig.setValue(AUTO_COMMIT_ENABLE, String.valueOf(autoCommit));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import org.apache.avro.Schema;
import org.slf4j.Logger;
Expand Down Expand Up @@ -129,7 +130,7 @@
*
* @param <I> Type of input for the write client
*/
public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableMetadataWriter {
public abstract class HoodieBackedTableMetadataWriter<I,O> implements HoodieTableMetadataWriter {

private static final Logger LOG = LoggerFactory.getLogger(HoodieBackedTableMetadataWriter.class);

Expand All @@ -141,7 +142,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM
// Record index has a fixed size schema. This has been calculated based on experiments with default settings
// for block size (1MB), compression (GZ) and disabling the hudi metadata fields.
private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48;
private transient BaseHoodieWriteClient<?, I, ?, ?> writeClient;
private transient BaseHoodieWriteClient<?, I, ?, O> writeClient;

protected HoodieWriteConfig metadataWriteConfig;
protected HoodieWriteConfig dataWriteConfig;
Expand Down Expand Up @@ -1370,7 +1371,7 @@ protected void commitInternal(String instantTime, Map<String, HoodieData<HoodieR
HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
I preppedRecordInputs = convertHoodieDataToEngineSpecificData(preppedRecords);

BaseHoodieWriteClient<?, I, ?, ?> writeClient = getWriteClient();
BaseHoodieWriteClient<?, I, ?, O> writeClient = getWriteClient();
// rollback partially failed writes if any.
if (dataWriteConfig.getFailedWritesCleanPolicy().isEager() && writeClient.rollbackFailedWrites()) {
metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
Expand Down Expand Up @@ -1404,10 +1405,10 @@ protected void commitInternal(String instantTime, Map<String, HoodieData<HoodieR
preWrite(instantTime);
if (isInitializing) {
engineContext.setJobStatus(this.getClass().getSimpleName(), String.format("Bulk inserting at %s into metadata table %s", instantTime, metadataWriteConfig.getTableName()));
writeClient.bulkInsertPreppedRecords(preppedRecordInputs, instantTime, bulkInsertPartitioner);
writeAndCommitBulkInsert(writeClient, instantTime, preppedRecordInputs, bulkInsertPartitioner);
} else {
engineContext.setJobStatus(this.getClass().getSimpleName(), String.format("Upserting at %s into metadata table %s", instantTime, metadataWriteConfig.getTableName()));
writeClient.upsertPreppedRecords(preppedRecordInputs, instantTime);
writeAndCommitUpsert(writeClient, instantTime, preppedRecordInputs);
}

metadataMetaClient.reloadActiveTimeline();
Expand All @@ -1416,6 +1417,10 @@ protected void commitInternal(String instantTime, Map<String, HoodieData<HoodieR
metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata, dataMetaClient.getTableConfig().getMetadataPartitions()));
}

protected abstract void writeAndCommitBulkInsert(BaseHoodieWriteClient<?, I, ?, O> writeClient, String instantTime, I preppedRecordInputs, Option<BulkInsertPartitioner> bulkInsertPartitioner);

protected abstract void writeAndCommitUpsert(BaseHoodieWriteClient<?, I, ?, O> writeClient, String instantTime, I preppedRecordInputs);

/**
* Allows the implementation to perform any pre-commit operations like transitioning a commit to inflight if required.
*
Expand Down Expand Up @@ -1566,15 +1571,17 @@ protected void compactIfNecessary(BaseHoodieWriteClient writeClient) {
LOG.info("Compaction with same {} time is already present in the timeline.", compactionInstantTime);
} else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
LOG.info("Compaction is scheduled for timestamp {}", compactionInstantTime);
writeClient.compact(compactionInstantTime);
HoodieWriteMetadata<O> compactionWriteMetadata = writeClient.compact(compactionInstantTime);
writeClient.commitCompaction(compactionInstantTime, compactionWriteMetadata.getCommitMetadata().get(), Option.empty());
} else if (metadataWriteConfig.isLogCompactionEnabled()) {
// Schedule and execute log compaction with new instant time.
final String logCompactionInstantTime = metadataMetaClient.createNewInstantTime(false);
if (metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(logCompactionInstantTime)) {
LOG.info("Log compaction with same {} time is already present in the timeline.", logCompactionInstantTime);
} else if (writeClient.scheduleLogCompactionAtInstant(logCompactionInstantTime, Option.empty())) {
LOG.info("Log compaction is scheduled for timestamp {}", logCompactionInstantTime);
writeClient.logCompact(logCompactionInstantTime);
HoodieWriteMetadata<O> logCompactionWriteMetadata = writeClient.logCompact(logCompactionInstantTime);
writeClient.commitLogCompaction(logCompactionInstantTime, logCompactionWriteMetadata.getCommitMetadata().get(), Option.empty());
}
}
}
Expand Down Expand Up @@ -1765,12 +1772,12 @@ public boolean isInitialized() {
return initialized;
}

protected BaseHoodieWriteClient<?, I, ?, ?> getWriteClient() {
protected BaseHoodieWriteClient<?, I, ?, O> getWriteClient() {
if (writeClient == null) {
writeClient = initializeWriteClient();
}
return writeClient;
}

protected abstract BaseHoodieWriteClient<?, I, ?, ?> initializeWriteClient();
protected abstract BaseHoodieWriteClient<?, I, ?, O> initializeWriteClient();
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public static HoodieWriteConfig createMetadataWriteConfig(
.build())
.withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).withFileListingParallelism(writeConfig.getFileListingParallelism()).build())
.withAutoCommit(true)
.withAutoCommit(false)
.withAvroSchemaValidate(false)
.withEmbeddedTimelineServerEnabled(false)
.withMarkersType(MarkerType.DIRECT.name())
Expand Down Expand Up @@ -256,8 +256,6 @@ public static HoodieWriteConfig createMetadataWriteConfig(
// Inline compaction and auto clean is required as we do not expose this table outside
ValidationUtils.checkArgument(!metadataWriteConfig.isAutoClean(), "Cleaning is controlled internally for Metadata table.");
ValidationUtils.checkArgument(!metadataWriteConfig.inlineCompactionEnabled(), "Compaction is controlled internally for metadata table.");
// Auto commit is required
ValidationUtils.checkArgument(metadataWriteConfig.shouldAutoCommit(), "Auto commit is required for Metadata Table");
ValidationUtils.checkArgument(metadataWriteConfig.getWriteStatusClassName().equals(FailOnFirstErrorWriteStatus.class.getName()),
"MDT should use " + FailOnFirstErrorWriteStatus.class.getName());
// Metadata Table cannot have metadata listing turned on. (infinite loop, much?)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
/**
* Flink hoodie backed table metadata writer.
*/
public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter<List<HoodieRecord>> {
public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter<List<HoodieRecord>, List<WriteStatus>> {
private static final Logger LOG = LoggerFactory.getLogger(FlinkHoodieBackedTableMetadataWriter.class);

public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, HoodieWriteConfig writeConfig,
Expand Down Expand Up @@ -172,13 +172,26 @@ protected void commitInternal(String instantTime, Map<String, HoodieData<HoodieR
metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata, dataMetaClient.getTableConfig().getMetadataPartitions()));
}

@Override
protected void writeAndCommitBulkInsert(BaseHoodieWriteClient<?, List<HoodieRecord>, ?, List<WriteStatus>> writeClient, String instantTime, List<HoodieRecord> preppedRecordInputs,
Option<BulkInsertPartitioner> bulkInsertPartitioner) {
List<WriteStatus> writeStatusJavaRDD = writeClient.bulkInsertPreppedRecords(preppedRecordInputs, instantTime, bulkInsertPartitioner);
writeClient.commit(instantTime, writeStatusJavaRDD);
}

@Override
protected void writeAndCommitUpsert(BaseHoodieWriteClient<?, List<HoodieRecord>, ?, List<WriteStatus>> writeClient, String instantTime, List<HoodieRecord> preppedRecordInputs) {
List<WriteStatus> writeStatusJavaRDD = writeClient.upsertPreppedRecords(preppedRecordInputs, instantTime);
writeClient.commit(instantTime, writeStatusJavaRDD);
}

@Override
public void deletePartitions(String instantTime, List<MetadataPartitionType> partitions) {
throw new HoodieNotSupportedException("Dropping metadata index not supported for Flink metadata table yet.");
}

@Override
public BaseHoodieWriteClient<?, List<HoodieRecord>, ?, ?> initializeWriteClient() {
public BaseHoodieWriteClient<?, List<HoodieRecord>, ?, List<WriteStatus>> initializeWriteClient() {
return new HoodieFlinkWriteClient(engineContext, metadataWriteConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand All @@ -33,6 +34,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieJavaTable;
import org.apache.hudi.table.HoodieTable;

Expand All @@ -44,7 +46,7 @@

import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;

public class JavaHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter<List<HoodieRecord>> {
public class JavaHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter<List<HoodieRecord>, List<WriteStatus>> {

/**
* Hudi backed table metadata writer.
Expand Down Expand Up @@ -108,7 +110,20 @@ protected void bulkCommit(String instantTime, String partitionName, HoodieData<H
}

@Override
protected BaseHoodieWriteClient<?, List<HoodieRecord>, ?, ?> initializeWriteClient() {
protected void writeAndCommitBulkInsert(BaseHoodieWriteClient<?, List<HoodieRecord>, ?, List<WriteStatus>> writeClient, String instantTime, List<HoodieRecord> preppedRecordInputs,
Option<BulkInsertPartitioner> bulkInsertPartitioner) {
List<WriteStatus> writeStatusJavaRDD = writeClient.bulkInsertPreppedRecords(preppedRecordInputs, instantTime, bulkInsertPartitioner);
writeClient.commit(instantTime, writeStatusJavaRDD);
}

@Override
protected void writeAndCommitUpsert(BaseHoodieWriteClient<?, List<HoodieRecord>, ?, List<WriteStatus>> writeClient, String instantTime, List<HoodieRecord> preppedRecordInputs) {
List<WriteStatus> writeStatusJavaRDD = writeClient.upsertPreppedRecords(preppedRecordInputs, instantTime);
writeClient.commit(instantTime, writeStatusJavaRDD);
}

@Override
protected BaseHoodieWriteClient<?, List<HoodieRecord>, ?, List<WriteStatus>> initializeWriteClient() {
return new HoodieJavaWriteClient(engineContext, metadataWriteConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkFunctionalIndex;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.SparkMetadataWriterUtils;
import org.apache.hudi.common.data.HoodieData;
Expand All @@ -46,6 +48,7 @@
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;

Expand Down Expand Up @@ -74,7 +77,7 @@
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;

public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter<JavaRDD<HoodieRecord>> {
public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter<JavaRDD<HoodieRecord>, JavaRDD<WriteStatus>> {

private static final Logger LOG = LoggerFactory.getLogger(SparkHoodieBackedTableMetadataWriter.class);

Expand Down Expand Up @@ -157,6 +160,19 @@ protected void bulkCommit(
commitInternal(instantTime, Collections.singletonMap(partitionName, records), true, Option.of(partitioner));
}

@Override
protected void writeAndCommitBulkInsert(BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient, String instantTime, JavaRDD<HoodieRecord> preppedRecordInputs,
Option<BulkInsertPartitioner> bulkInsertPartitioner) {
JavaRDD<WriteStatus> writeStatusJavaRDD = writeClient.bulkInsertPreppedRecords(preppedRecordInputs, instantTime, bulkInsertPartitioner);
writeClient.commit(instantTime, writeStatusJavaRDD);
}

@Override
protected void writeAndCommitUpsert(BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> writeClient, String instantTime, JavaRDD<HoodieRecord> preppedRecordInputs) {
JavaRDD<WriteStatus> writeStatusJavaRDD = writeClient.upsertPreppedRecords(preppedRecordInputs, instantTime);
writeClient.commit(instantTime, writeStatusJavaRDD);
}

@Override
public void deletePartitions(String instantTime, List<MetadataPartitionType> partitions) {
List<String> partitionsToDrop = partitions.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList());
Expand All @@ -165,7 +181,8 @@ public void deletePartitions(String instantTime, List<MetadataPartitionType> par
SparkRDDWriteClient writeClient = (SparkRDDWriteClient) getWriteClient();
String actionType = CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION, HoodieTableType.MERGE_ON_READ);
writeClient.startCommitWithTime(instantTime, actionType);
writeClient.deletePartitions(partitionsToDrop, instantTime);
HoodieWriteResult writeResult = writeClient.deletePartitions(partitionsToDrop, instantTime);
writeClient.commit(instantTime, writeResult.getWriteStatuses(), Option.empty(), actionType, writeResult.getPartitionToReplaceFileIds(), Option.empty());
}

@Override
Expand Down Expand Up @@ -227,7 +244,7 @@ protected HoodieTable getTable(HoodieWriteConfig writeConfig, HoodieTableMetaCli
}

@Override
public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, ?> initializeWriteClient() {
public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, JavaRDD<WriteStatus>> initializeWriteClient() {
return new SparkRDDWriteClient(engineContext, metadataWriteConfig, Option.empty());
}

Expand Down
Loading

0 comments on commit 3c5bbc8

Please sign in to comment.