Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] fix refresh mv error when hive table has null values in the partition (#51238) #51240

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,9 @@ public static Map<String, Range<PartitionKey>> getRangePartitionMapOfExternalTab
if (lowerBound.getKeys().get(0).isNullable()) {
// If partition key is NULL literal, rewrite it to min value.
lowerBound = PartitionKey.createInfinityPartitionKeyWithType(
ImmutableList.of(partitionColumn.getPrimitiveType()), false);
ImmutableList.of(isConvertToDate ? partitionExpr.getType().getPrimitiveType()
: partitionColumn.getPrimitiveType()), false);
lowerBound = convertPartitionKeyIfNeeded(lowerBound, isConvertToDate);
}
Preconditions.checkState(!mvPartitionRangeMap.containsKey(partitionKeyName));
PartitionKey upperBound = nextPartitionKey(lowerBound, partitionDateTimeInterval, partitionColPrimType,
Expand All @@ -497,6 +499,13 @@ public static Map<String, Range<PartitionKey>> getRangePartitionMapOfExternalTab
return mvPartitionRangeMap;
}

public static PartitionKey convertPartitionKeyIfNeeded(PartitionKey partitionKey, boolean isConvertToDate) {
if (isConvertToDate) {
return convertToString(partitionKey);
}
return partitionKey;
}

public static PartitionKey nextPartitionKey(PartitionKey lastPartitionKey,
DateTimeInterval dateTimeInterval,
PrimitiveType partitionColPrimType,
Expand Down Expand Up @@ -676,6 +685,27 @@ private static PartitionKey convertToDate(PartitionKey partitionKey) throws Sema
}
}

/**
* Convert a date type partition key to string type partition key.
* @param partitionKey : input date partition key to convert.
* @return : partition key with string type if input can be converted.
*/
private static PartitionKey convertToString(PartitionKey partitionKey) throws SemanticException {
PartitionKey newPartitionKey = new PartitionKey();
try {
DateLiteral dateLiteral = (DateLiteral) (partitionKey.getKeys().get(0));
StringLiteral literalExpr = new StringLiteral(dateLiteral.getStringValue());
newPartitionKey.pushColumn(literalExpr, PrimitiveType.VARCHAR);
return newPartitionKey;
} catch (SemanticException e) {
SemanticException semanticException =
new SemanticException("convert date %s to string partition key failed:",
partitionKey.getKeys().get(0).getStringValue(), e);
semanticException.addSuppressed(e);
throw semanticException;
}
}

private static void putMvPartitionKeyIntoMap(Table table, Column partitionColumn, List<PartitionKey> partitionKeys,
Map<String, PartitionKey> mvPartitionKeyMap, String partitionValue)
throws AnalysisException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ public static void mockPartitionTable() {
mockT1WithMultiPartitionColumns(); // t1_par
mockT2WithMultiPartitionColumns(); // t2_par
mockT1WithNullPartitionColumns(); // t1_par_null
mockT2WithNullPartitionColumns(); // t2_par_null
mockTablesWithSinglePartitionColumn();
mockOrders();
mockWithMultiDuplicatePartitionColumns();
Expand Down Expand Up @@ -1603,6 +1604,67 @@ public static void mockT2WithMultiPartitionColumns() {
partitionNames, (long) rowCount, columnStatisticMap, remoteFileInfos));
}

public static void mockT2WithNullPartitionColumns() {
MOCK_TABLE_MAP.putIfAbsent(MOCKED_PARTITIONED_DB_NAME, new CaseInsensitiveMap<>());
Map<String, HiveTableInfo> mockTables = MOCK_TABLE_MAP.get(MOCKED_PARTITIONED_DB_NAME);

List<FieldSchema> cols = Lists.newArrayList();
cols.add(new FieldSchema("c1", "int", null));
cols.add(new FieldSchema("c2", "string", null));
cols.add(new FieldSchema("c3", "string", null));

StorageDescriptor sd =
new StorageDescriptor(cols, "", MAPRED_PARQUET_INPUT_FORMAT_CLASS, "", false,
-1, null, Lists.newArrayList(), Lists.newArrayList(), Maps.newHashMap());
Table t1 = new Table("t2_par_null", "partitioned_db", null, 0, 0, 0, sd,
ImmutableList.of(new FieldSchema("par_date", "string", null)), Maps.newHashMap(), null, null,
"EXTERNAL_TABLE");
List<String> partitionNames = Lists.newArrayList("par_date=2020-01-01", "par_date=2020-01-02",
"par_date=2020-01-03", "par_date=__HIVE_DEFAULT_PARTITION__", "par_date=2020-01-03", "par_date=2020-01-04");
Map<String, HivePartitionStats> hivePartitionStatsMap = Maps.newHashMap();
double avgNumPerPartition = (double) (100 / 3);
double rowCount = 100;

List<PartitionKey> partitionKeyList = Lists.newArrayList();
Column partitionColumn1 = new Column("par_date", Type.STRING);
List<Column> partitionColumns = ImmutableList.of(partitionColumn1);
try {
partitionKeyList.add(
PartitionUtil.createPartitionKey(ImmutableList.of("2020-01-02"), partitionColumns));
partitionKeyList.add(
PartitionUtil.createPartitionKey(ImmutableList.of("2020-01-02"), partitionColumns));
partitionKeyList.add(
PartitionUtil.createPartitionKey(ImmutableList.of("2020-01-03"), partitionColumns));
partitionKeyList.add(
PartitionUtil.createPartitionKey(ImmutableList.of("__HIVE_DEFAULT_PARTITION__"), partitionColumns));
partitionKeyList.add(
PartitionUtil.createPartitionKey(ImmutableList.of("2020-01-03"), partitionColumns));
partitionKeyList.add(
PartitionUtil.createPartitionKey(ImmutableList.of("2020-01-04"), partitionColumns));
} catch (AnalysisException e) {
throw new RuntimeException(e);
}

List<String> partitionColumnNames = ImmutableList.of("par_date");
ColumnStatistic partitionColumnStats1 =
getPartitionColumnStatistic(partitionColumn1, partitionKeyList, partitionColumnNames,
hivePartitionStatsMap, avgNumPerPartition, rowCount);

Map<String, ColumnStatistic> columnStatisticMap;
List<String> colNames = cols.stream().map(FieldSchema::getName).collect(Collectors.toList());
columnStatisticMap =
colNames.stream().collect(Collectors.toMap(Function.identity(), col -> ColumnStatistic.unknown()));
columnStatisticMap.put("par_date", partitionColumnStats1);

List<RemoteFileInfo> remoteFileInfos = Lists.newArrayList();
partitionNames.forEach(
k -> remoteFileInfos.add(new RemoteFileInfo(RemoteFileInputFormat.ORC, ImmutableList.of(), null)));

mockTables.put(t1.getTableName(),
new HiveTableInfo(HiveMetastoreApiConverter.toHiveTable(t1, MOCKED_HIVE_CATALOG_NAME),
partitionNames, (long) rowCount, columnStatisticMap, remoteFileInfos));
}

public static void mockTablesWithSinglePartitionColumn() {
MOCK_TABLE_MAP.putIfAbsent(MOCKED_PARTITIONED_DB_NAME, new CaseInsensitiveMap<>());
Map<String, HiveTableInfo> mockTables = MOCK_TABLE_MAP.get(MOCKED_PARTITIONED_DB_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
@BeforeClass
public static void beforeClass() throws Exception {
MVRefreshTestBase.beforeClass();
ConnectorPlanTestBase.mockHiveCatalog(connectContext);

Check failure on line 63 in fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorHiveTest.java

View workflow job for this annotation

GitHub Actions / FE UT Report

PartitionBasedMvRefreshProcessorHiveTest.com.starrocks.scheduler.PartitionBasedMvRefreshProcessorHiveTest

java.lang.ExceptionInInitializerError at com.starrocks.sql.plan.ConnectorPlanTestBase.mockHiveCatalogImpl(ConnectorPlanTestBase.java:166)
Raw output
java.lang.ExceptionInInitializerError
	at com.starrocks.sql.plan.ConnectorPlanTestBase.mockHiveCatalogImpl(ConnectorPlanTestBase.java:166)
	at com.starrocks.sql.plan.ConnectorPlanTestBase.mockHiveCatalog(ConnectorPlanTestBase.java:155)
	at com.starrocks.scheduler.PartitionBasedMvRefreshProcessorHiveTest.beforeClass(PartitionBasedMvRefreshProcessorHiveTest.java:63)
Caused by: java.lang.IllegalStateException: Duplicate key par_date=2020-01-03 (attempted merging values Partition{parameters={transient_lastDdlTime=1726848172}, inputFormat=PARQUET, textFileFormatDesc=null, fullPath='MockedPartitionFullPath/par_date=2020-01-03', isSplittable=false} and Partition{parameters={transient_lastDdlTime=1726848172}, inputFormat=PARQUET, textFileFormatDesc=null, fullPath='MockedPartitionFullPath/par_date=2020-01-03', isSplittable=false})
	at com.starrocks.connector.hive.MockedHiveMetadata$HiveTableInfo.<init>(MockedHiveMetadata.java:1836)
	at com.starrocks.connector.hive.MockedHiveMetadata.mockT2WithNullPartitionColumns(MockedHiveMetadata.java:1664)
	at com.starrocks.connector.hive.MockedHiveMetadata.mockPartitionTable(MockedHiveMetadata.java:728)
	at com.starrocks.connector.hive.MockedHiveMetadata.<clinit>(MockedHiveMetadata.java:101)
	... 3 more
Config.enable_mv_refresh_query_rewrite = false;
starRocksAssert.withTable("CREATE TABLE test.tbl1\n" +
"(\n" +
Expand Down Expand Up @@ -119,7 +119,17 @@
"\"storage_medium\" = \"HDD\"\n" +
")\n" +
"AS SELECT t1.c1, t1.c2, t1_par.par_col, t1_par.par_date FROM `hive0`.`partitioned_db`.`t1` join " +
"`hive0`.`partitioned_db`.`t1_par` using (par_col)");
"`hive0`.`partitioned_db`.`t1_par` using (par_col)")
.withMaterializedView("CREATE MATERIALIZED VIEW `test`.`hive_parttbl_mv2`\n" +
"COMMENT \"MATERIALIZED_VIEW\"\n" +
"PARTITION BY str2date(`par_date`, '%Y-%m-%d')\n" +
"DISTRIBUTED BY HASH(`c1`) BUCKETS 10\n" +
"REFRESH DEFERRED MANUAL\n" +
"PROPERTIES (\n" +
"\"replication_num\" = \"1\",\n" +
"\"storage_medium\" = \"HDD\"\n" +
")\n" +
"AS SELECT c1, c2, par_date FROM `hive0`.`partitioned_db`.`t2_par_null`;");
}

@AfterClass
Expand Down Expand Up @@ -1240,4 +1250,21 @@
),
calls);
}

@Test
public void testStr2DateMVRefreshRewriteSingleTableRewrite() throws Exception {
Database testDb = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb("test");
MaterializedView materializedView = ((MaterializedView) testDb.getTable("hive_parttbl_mv2"));

Task task = TaskBuilder.buildMvTask(materializedView, testDb.getFullName());
TaskRun taskRun = TaskRunBuilder.newBuilder(task).build();
initAndExecuteTaskRun(taskRun);
PartitionBasedMvRefreshProcessor processor = (PartitionBasedMvRefreshProcessor)
taskRun.getProcessor();

MvTaskRunContext mvContext = processor.getMvContext();
ExecPlan execPlan = mvContext.getExecPlan();
String plan = execPlan.getExplainString(TExplainLevel.NORMAL);
Assert.assertTrue(plan.contains("t2_par_null"));
}
}
Loading