Skip to content

Commit

Permalink
[HUDI-8433] Fix not update issuedOffset when stream read empty commits (
Browse files Browse the repository at this point in the history
  • Loading branch information
fhan688 authored Nov 8, 2024
1 parent f8ae5f5 commit 3a57591
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.source;

import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
Expand Down Expand Up @@ -214,8 +215,10 @@ public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> cont
}
IncrementalInputSplits.Result result =
incrementalInputSplits.inputSplits(metaClient, this.issuedOffset, this.cdcEnabled);
if (result.isEmpty()) {

if (result.isEmpty() && StringUtils.isNullOrEmpty(result.getEndInstant())) {
// no new instants, returns early
LOG.warn("Result is empty, do not update issuedInstant.");
return;
}

Expand Down Expand Up @@ -282,4 +285,9 @@ private void registerMetrics() {
readMetrics = new FlinkStreamReadMetrics(metrics);
readMetrics.registerMetrics();
}

public String getIssuedOffset() {
return issuedOffset;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
package org.apache.hudi.source;

import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
Expand All @@ -42,6 +46,7 @@

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
Expand All @@ -51,6 +56,8 @@

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
Expand Down Expand Up @@ -143,6 +150,48 @@ public void testConsumeFromLastCommit() throws Exception {
}
}

@Test
public void testConsumeForSpeedLimitWhenEmptyCommitExists() throws Exception {
// Step1 : create 4 empty commit
Configuration conf = new Configuration(this.conf);
conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
conf.setBoolean(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), true);

TestData.writeData(Collections.EMPTY_LIST, conf);
TestData.writeData(Collections.EMPTY_LIST, conf);
TestData.writeData(Collections.EMPTY_LIST, conf);
TestData.writeData(Collections.EMPTY_LIST, conf);

HoodieTableMetaClient metaClient = HoodieTestUtils.init(conf.get(FlinkOptions.PATH), HoodieTableType.COPY_ON_WRITE);
HoodieTimeline commitsTimeline = metaClient.reloadActiveTimeline()
.filter(hoodieInstant -> hoodieInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
HoodieInstant firstInstant = commitsTimeline.firstInstant().get();

// Step2: trigger streaming read from first instant and set READ_COMMITS_LIMIT 2
conf.set(FlinkOptions.READ_AS_STREAMING, true);
conf.set(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, true);
conf.set(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true);
conf.set(FlinkOptions.READ_COMMITS_LIMIT, 2);
conf.set(FlinkOptions.READ_START_COMMIT, String.valueOf((Long.valueOf(firstInstant.getTimestamp()) - 100)));
StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = createHarness(function)) {
harness.setup();
harness.open();

CountDownLatch latch = new CountDownLatch(0);
CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
function.monitorDirAndForwardSplits(sourceContext);
assertEquals(0, sourceContext.splits.size(), "There should be no inputSplits");

// Step3: assert current IssuedOffset couldn't be null.
// Base on "IncrementalInputSplits#inputSplits => .startCompletionTime(issuedOffset != null ? issuedOffset : this.conf.getString(FlinkOptions.READ_START_COMMIT))"
// If IssuedOffset still was null, hudi would take FlinkOptions.READ_START_COMMIT again, which means streaming read is blocked.
assertNotNull(function.getIssuedOffset());
// Stop the stream task.
function.close();
}
}

@Test
public void testConsumeFromSpecifiedCommit() throws Exception {
// write 2 commits first, use the second commit time as the specified start instant,
Expand Down

0 comments on commit 3a57591

Please sign in to comment.