Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8210] Support writing table version 6 log format from 1.x #12206

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

vinothchandar
Copy link
Member

@vinothchandar vinothchandar commented Nov 4, 2024

Change Logs

  • Annotate log headers, blocks with table version (tv)
  • Handle both tv=6, and tv=8 for log naming
  • Eliminate notion of rolloverWriteToken
  • Simplify/make efficient log writer building across code paths
  • Handle both tv=6 & 8 for log version, handling
  • Prevent tv=8 headers from being written with tv=6.
  • Bring back tv=6 rollback behavior.

Impact

Hudi writers can now write tableVersion 6 compatible log format, in addition to tableVersion 8.

Risk level (write none, low medium or high below)

low

Documentation Update

No new configurations.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Nov 4, 2024
@vinothchandar vinothchandar force-pushed the hudi-8210-code-prep branch 2 times, most recently from 94f39b3 to 6b433e7 Compare November 5, 2024 01:42
@vinothchandar
Copy link
Member Author

@hudi-bot run azure

@apache apache deleted a comment from hudi-bot Nov 5, 2024
@vinothchandar
Copy link
Member Author

@hudi-bot run azure

@vinothchandar vinothchandar force-pushed the hudi-8210-code-prep branch 2 times, most recently from ac58a5d to 52f31ab Compare November 5, 2024 04:11
prevCommit = instantTime;
// Handle log file only case. This is necessary for the concurrent clustering and writer case (e.g., consistent hashing bucket index).
// NOTE: flink engine use instantTime to mark operation type, check BaseFlinkCommitActionExecutor::execute
if (record.getCurrentLocation() != null && HoodieInstantTimeGenerator.isValidInstantTime(record.getCurrentLocation().getInstantTime())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be just getFileInstant(record).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you see an issue with keeping this as-is.. Makes it easy to debug across 0.15 and 1.0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic are actually the same.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 can you check if you are ok. renamed and pushed a change


writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.constructAbsolutePath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
.withFileId(fileId)
.withDeltaCommit(instantToRollback.getTimestamp())
.withLogWriteToken(CommonClientUtils.generateWriteToken(taskContextSupplier))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The write token is meaningless because the rollback tasks have different task context with the writer task.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll check this out.. I was trying to avoid doing FS listings or sth expensive, without this being passed in

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at-least for Spark - this makes sense I think. each task is a rollback to a given file group.. So they'd have different write tokens. My intention don't want to compute this from storage .. lmk if you have concerns with proceeding

@danny0405
Copy link
Contributor

@vinothchandar Thanks for the contribution, is the write version and table version the same stuff, what happens when these two are inconsistent?

@vinothchandar
Copy link
Member Author

@danny0405 write fails in that case (post the upgrade flow). I wired in the check CommonClientUtils::validateXXX in the last PR

@danny0405
Copy link
Contributor

@vinothchandar I'm wondering if we can just abstract out the function discrepencies as a new method that can be extensible, then we can have a HoodieAppendHandleFactory for current and legacy impls. Like what we do to HoodieMergeHandle: we introduce the HoodieMergeHandleFactory to handle ramifications of regular write and cdc write. This way we can make the current HoodieAppendHandle much cleaner and more easier to maintain.

Same thoughts to HoodieLogFormat.

@vinothchandar vinothchandar marked this pull request as ready for review November 13, 2024 03:03
@vinothchandar
Copy link
Member Author

@danny0405 Filed https://issues.apache.org/jira/browse/HUDI-8509 to track better abstractions. They are pretty localized changes for now. (unlike timeline, which needs a better abstractions due to spread of use)

@github-actions github-actions bot added size:XL PR with lines of changes > 1000 and removed size:L PR with lines of changes in (300, 1000] labels Nov 13, 2024
@vinothchandar
Copy link
Member Author

@bvaradar @danny0405 Can you review this again.

@bvaradar Following tests I could not get to given the change of plans to land this first.

When you test this alongside yours, please consider.

  • Functional test that writes tv=6 CoW/MoR table, can successful compact/rollback (marker based) without needing upgrade to 8.
  • Spark should be able to read logs with rollbacks (TestHoodieLogFormat already covers non file group reader tests)

@vinothchandar
Copy link
Member Author

@danny0405 Can you do a full review please? so I can address in one go - or is that the only comment you had?

 - Annotate log headers, blocks with table version (tv)
 - Handle both tv=6, and tv=8 for log naming
 - Eliminate notion of rolloverWriteToken
 - Simplify/make efficient log writer building across code paths
 - Handle both tv=6 & 8 for log version, handling
 - Prevent tv=8 headers from being written with tv=6.
 - Bring back tv=6 rollback behavior.
 - Simple adapters in AbstractFileSystemView to handle filtering of uncommitted logs/files in 0.x
 - Validations for disallowed writer configs around NBCC, log format for table version=6 writing from 1.x
 - TestHoodieTableFileSystemView tests have all been redone to handle both styles of log naming
 - New unit tests
@@ -1039,13 +1109,14 @@ public final Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String partit
public final Stream<FileSlice> getAllLogsMergedFileSliceBeforeOrOn(String partitionStr, String maxInstantTime) {
try {
readLock.lock();
HoodieTableVersion version = metaClient.getTableConfig().getTableVersion();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used.

@@ -987,13 +1051,17 @@ private Option<FileSlice> getLatestFileSliceFilteringUncommittedFiles(Stream<Fil
public final Map<String, Stream<FileSlice>> getAllLatestFileSlicesBeforeOrOn(String maxCommitTime) {
try {
readLock.lock();
HoodieTableVersion version = metaClient.getTableConfig().getTableVersion();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used.

return Option.ofNullable(filterUncommittedFiles(fs.get(), true).map(this::addBootstrapBaseFileIfPresent).findFirst().orElse(null));
Stream<FileSlice> fileSlices = tableVersion8AndAbove()
? this.filterUncommittedFiles(fs.get(), true)
: this.filterBaseFileAfterPendingCompaction(fs.get(), true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to also fix the merged log record scanner to bring back the pending instant check for the log blocks because of these unfiltered log files?

@vinothchandar
Copy link
Member Author

vinothchandar commented Nov 14, 2024

Pending

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:XL PR with lines of changes > 1000
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants