Skip to content

Commit

Permalink
[HUDI-8210] Support writing table version 6 log format from 1.x
Browse files Browse the repository at this point in the history
 - Annotate log headers, blocks with table version (tv)
 - Handle both tv=6, and tv=8 for log naming
 - Eliminate notion of rolloverWriteToken
 - Simplify/make efficient log writer building across code paths
 - Handle both tv=6 & 8 for log version, handling
 - Prevent tv=8 headers from being written with tv=6.
 - Bring back tv=6 rollback behavior.
  • Loading branch information
vinothchandar committed Nov 5, 2024
1 parent b216d79 commit 52f31ab
Show file tree
Hide file tree
Showing 30 changed files with 350 additions and 227 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void init() throws IOException, InterruptedException, URISyntaxException
try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(new StoragePath(partitionPath))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-log-fileid1").withDeltaCommit("100").withStorage(storage)
.withFileId("test-log-fileid1").withInstantTime("100").withStorage(storage)
.withSizeThreshold(1).build()) {

// write data to file
Expand Down Expand Up @@ -209,7 +209,7 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
writer =
HoodieLogFormat.newWriterBuilder().onParentPath(new StoragePath(partitionPath))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-log-fileid1").withDeltaCommit(INSTANT_TIME).withStorage(
.withFileId("test-log-fileid1").withInstantTime(INSTANT_TIME).withStorage(
storage)
.withSizeThreshold(500).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
Expand Down Expand Up @@ -154,46 +155,74 @@ public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTa
this.sizeEstimator = new DefaultSizeEstimator();
this.statuses = new ArrayList<>();
this.recordProperties.putAll(config.getProps());
this.shouldWriteRecordPositions = config.shouldWriteRecordPositions();
this.shouldWriteRecordPositions = config.shouldWriteRecordPositions()
// record positions supported only from table version 8
&& config.getWriteVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT);
}

public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier sparkTaskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, null, sparkTaskContextSupplier);
}

private void init(HoodieRecord record) {
if (!doInit) {
return;
}

String prevCommit = instantTime;
private void populateWriteStat(HoodieRecord record, HoodieDeltaWriteStat deltaWriteStat) {
HoodieTableVersion tableVersion = hoodieTable.version();
String prevCommit;
String baseFile = "";
List<String> logFiles = new ArrayList<>();
if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
// the cdc reader needs the base file metadata to have deterministic update sequence.

if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
// table versions 8 and greater.
prevCommit = instantTime;
if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
// the cdc reader needs the base file metadata to have deterministic update sequence.
TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, fileId);
if (fileSlice.isPresent()) {
prevCommit = fileSlice.get().getBaseInstantTime();
baseFile = fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
logFiles = fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
}
}
} else {
// older table versions.
TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, fileId);
if (fileSlice.isPresent()) {
prevCommit = fileSlice.get().getBaseInstantTime();
baseFile = fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
logFiles = fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
} else {
// Set the base commit time as the current instantTime for new inserts into log files
prevCommit = instantTime;
// Handle log file only case. This is necessary for the concurrent clustering and writer case (e.g., consistent hashing bucket index).
// NOTE: flink engine use instantTime to mark operation type, check BaseFlinkCommitActionExecutor::execute
if (record.getCurrentLocation() != null && HoodieInstantTimeGenerator.isValidInstantTime(record.getCurrentLocation().getInstantTime())) {
prevCommit = record.getCurrentLocation().getInstantTime();
}
// This means there is no base data file, start appending to a new log file
LOG.info("New file group from append handle for partition {}", partitionPath);
}
}

deltaWriteStat.setPrevCommit(prevCommit);
deltaWriteStat.setBaseFile(baseFile);
deltaWriteStat.setLogFiles(logFiles);
}

private void init(HoodieRecord record) {
if (!doInit) {
return;
}
// Prepare the first write status
HoodieDeltaWriteStat deltaWriteStat = new HoodieDeltaWriteStat();
writeStatus.setStat(deltaWriteStat);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
averageRecordSize = sizeEstimator.sizeEstimate(record);

deltaWriteStat.setPrevCommit(prevCommit);
deltaWriteStat.setPartitionPath(partitionPath);
deltaWriteStat.setFileId(fileId);
deltaWriteStat.setBaseFile(baseFile);
deltaWriteStat.setLogFiles(logFiles);

populateWriteStat(record, deltaWriteStat);
averageRecordSize = sizeEstimator.sizeEstimate(record);
try {
// Save hoodie partition meta in the partition path
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(storage, instantTime,
Expand All @@ -202,12 +231,14 @@ private void init(HoodieRecord record) {
hoodieTable.getPartitionMetafileFormat());
partitionMetadata.trySave();

this.writer = createLogWriter(getFileInstant(record));
String instantTime = config.getWriteVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)
? getFileInstant(record) : deltaWriteStat.getPrevCommit();
this.writer = createLogWriter(instantTime);
} catch (Exception e) {
LOG.error("Error in update task at commit " + instantTime, e);
writeStatus.setGlobalError(e);
throw new HoodieUpsertException("Failed to initialize HoodieAppendHandle for FileId: " + fileId + " on commit "
+ instantTime + " on HDFS path " + hoodieTable.getMetaClient().getBasePath() + "/" + partitionPath, e);
+ instantTime + " on storage path " + hoodieTable.getMetaClient().getBasePath() + "/" + partitionPath, e);
}
doInit = false;
}
Expand Down Expand Up @@ -454,7 +485,7 @@ protected void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header,
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchemaWithMetaFields.toString());
List<HoodieLogBlock> blocks = new ArrayList<>(2);
if (recordList.size() > 0) {
if (!recordList.isEmpty()) {
String keyField = config.populateMetaFields()
? HoodieRecord.RECORD_KEY_METADATA_FIELD
: hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
Expand All @@ -463,12 +494,12 @@ protected void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header,
getUpdatedHeader(header, config), keyField));
}

if (appendDeleteBlocks && recordsToDeleteWithPositions.size() > 0) {
if (appendDeleteBlocks && !recordsToDeleteWithPositions.isEmpty()) {
blocks.add(new HoodieDeleteBlock(recordsToDeleteWithPositions, shouldWriteRecordPositions,
getUpdatedHeader(header, config)));
}

if (blocks.size() > 0) {
if (!blocks.isEmpty()) {
AppendResult appendResult = writer.appendBlocks(blocks);
processAppendResult(appendResult, recordList);
recordList.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,27 +234,28 @@ private static Schema getWriteSchema(HoodieWriteConfig config) {
return new Schema.Parser().parse(config.getWriteSchema());
}

protected HoodieLogFormat.Writer createLogWriter(String deltaCommitTime) {
return createLogWriter(deltaCommitTime, null);
protected HoodieLogFormat.Writer createLogWriter(String instantTime) {
return createLogWriter(instantTime, null);
}

protected HoodieLogFormat.Writer createLogWriter(String deltaCommitTime, String fileSuffix) {
protected HoodieLogFormat.Writer createLogWriter(String instantTime, String fileSuffix) {
try {
return HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.constructAbsolutePath(hoodieTable.getMetaClient().getBasePath(), partitionPath))
.withFileId(fileId)
.withDeltaCommit(deltaCommitTime)
.withInstantTime(instantTime)
.withFileSize(0L)
.withSizeThreshold(config.getLogFileMaxSize())
.withStorage(storage)
.withRolloverLogWriteToken(writeToken)
.withLogWriteToken(writeToken)
.withFileCreationCallback(getLogCreationCallback())
.withTableVersion(config.getWriteVersion())
.withSuffix(fileSuffix)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.build();
} catch (IOException e) {
throw new HoodieException("Creating logger writer with fileId: " + fileId + ", "
+ "delta commit time: " + deltaCommitTime + ", "
+ "delta commit time: " + instantTime + ", "
+ "file suffix: " + fileSuffix + " error");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -920,13 +920,13 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata
try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.constructAbsolutePath(metadataWriteConfig.getBasePath(), partitionName))
.withFileId(fileGroupFileId)
.withDeltaCommit(instantTime)
.withInstantTime(instantTime)
.withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION)
.withFileSize(0L)
.withSizeThreshold(metadataWriteConfig.getLogFileMaxSize())
.withStorage(dataMetaClient.getStorage())
.withRolloverLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)
.withLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)
.withTableVersion(metadataWriteConfig.getWriteVersion())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build()) {
writer.appendBlock(block);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand Down Expand Up @@ -148,6 +149,10 @@ public boolean isMetadataTable() {
return isMetadataTable;
}

public HoodieTableVersion version() {
return metaClient.getTableConfig().getTableVersion();
}

protected abstract HoodieIndex<?, ?> getIndex(HoodieWriteConfig config, HoodieEngineContext context);

private synchronized FileSystemViewManager getViewManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
Expand All @@ -35,6 +37,7 @@
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.util.CommonClientUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -113,6 +116,7 @@ List<Pair<String, HoodieRollbackStat>> maybeDeleteAndCollectStats(HoodieEngineCo
HoodieInstant instantToRollback,
List<SerializableHoodieRollbackRequest> rollbackRequests,
boolean doDelete, int numPartitions) {
final TaskContextSupplier taskContextSupplier = context.getTaskContextSupplier();
return context.flatMap(rollbackRequests, (SerializableFunction<SerializableHoodieRollbackRequest, Stream<Pair<String, HoodieRollbackStat>>>) rollbackRequest -> {
List<String> filesToBeDeleted = rollbackRequest.getFilesToBeDeleted();
if (!filesToBeDeleted.isEmpty()) {
Expand All @@ -125,12 +129,17 @@ List<Pair<String, HoodieRollbackStat>> maybeDeleteAndCollectStats(HoodieEngineCo
final StoragePath filePath;
try {
String fileId = rollbackRequest.getFileId();
HoodieTableVersion tableVersion = metaClient.getTableConfig().getTableVersion();

writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.constructAbsolutePath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
.withFileId(fileId)
.withDeltaCommit(instantToRollback.getTimestamp())
.withLogWriteToken(CommonClientUtils.generateWriteToken(taskContextSupplier))
.withInstantTime(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
? instantToRollback.getTimestamp() : rollbackRequest.getLatestBaseInstant()
)
.withStorage(metaClient.getStorage())
.withTableVersion(tableVersion)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();

// generate metadata
Expand Down Expand Up @@ -162,20 +171,19 @@ List<Pair<String, HoodieRollbackStat>> maybeDeleteAndCollectStats(HoodieEngineCo
1L
);

return Collections.singletonList(
return Stream.of(
Pair.of(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder()
.withPartitionPath(rollbackRequest.getPartitionPath())
.withRollbackBlockAppendResults(filesToNumBlocksRollback)
.build()))
.stream();
.build()));
} else {
return Collections.singletonList(
// no action needed.
return Stream.of(
Pair.of(rollbackRequest.getPartitionPath(),
HoodieRollbackStat.newBuilder()
.withPartitionPath(rollbackRequest.getPartitionPath())
.build()))
.stream();
.build()));
}
}, numPartitions);
}
Expand Down
Loading

0 comments on commit 52f31ab

Please sign in to comment.