Skip to content

Commit

Permalink
Update mv insert timeout and improve some error codes
Browse files Browse the repository at this point in the history
Signed-off-by: wyb <[email protected]>
  • Loading branch information
wyb committed Nov 6, 2024
1 parent ff639b3 commit 3eaaffb
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public Void visitModifyTablePropertiesClause(ModifyTablePropertiesClause modifyT
if (!entry.getKey().startsWith(PropertyAnalyzer.PROPERTIES_MATERIALIZED_VIEW_SESSION_PREFIX)) {
throw new SemanticException("Modify failed because unknown properties: " + properties +
", please add `session.` prefix if you want add session variables for mv(" +
"eg, \"session.query_timeout\"=\"30000000\").");
"eg, \"session.insert_timeout\"=\"30000000\").");
}
String varKey = entry.getKey().substring(PropertyAnalyzer.PROPERTIES_MATERIALIZED_VIEW_SESSION_PREFIX.length());
SystemVariable variable = new SystemVariable(varKey, new StringLiteral(entry.getValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ protected void runWaitingTxnJob() throws AlterCancelException {
rewriteTask.setTempPartitionName(tmpPartitionName);
rewriteTask.setLastVersion(partitionLastVersion.get(i));
// use half of the alter timeout as rewrite task timeout
rewriteTask.getProperties().put(SessionVariable.QUERY_TIMEOUT, String.valueOf(timeoutMs / 2000));
rewriteTask.getProperties().put(SessionVariable.INSERT_TIMEOUT, String.valueOf(timeoutMs / 2000));
rewriteTasks.add(rewriteTask);
}

Expand Down
4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ public enum ErrorCode {
"No partitions have data available for loading. If you are sure there may be no data to be loaded, " +
"you can use `ADMIN SET FRONTEND CONFIG ('empty_load_as_error' = 'false')` " +
"to ensure such load jobs can succeed"),
ERR_INSERT_COLUMN_COUNT_MISMATCH(5604, new byte[] {'2', '2', '0', '0', '0'},
ERR_INSERT_COLUMN_COUNT_MISMATCH(5604, new byte[] {'4', '2', '6', '0', '1'},
"Inserted target column count: %d doesn't match select/value column count: %d"),
ERR_ILLEGAL_BYTES_LENGTH(5605, new byte[] {'4', '2', '0', '0', '0'}, "The valid bytes length for '%s' is [%d, %d]"),
ERR_TOO_MANY_ERROR_ROWS(5606, new byte[] {'2', '2', '0', '0', '0'},
Expand All @@ -328,7 +328,7 @@ public enum ErrorCode {
ERR_ROUTINE_LOAD_OFFSET_INVALID(5607, new byte[] {'0', '2', '0', '0', '0'},
"Consume offset: %d is greater than the latest offset: %d in kafka partition: %d. " +
"You can modify 'kafka_offsets' property through ALTER ROUTINE LOAD and RESUME the job"),
ERR_INSERT_COLUMN_NAME_MISMATCH(5608, new byte[] {'2', '2', '0', '0', '0'}, "%s column: %s has no matching %s column"),
ERR_INSERT_COLUMN_NAME_MISMATCH(5608, new byte[] {'4', '2', '6', '0', '1'}, "%s column: %s has no matching %s column"),

/**
* 5700 - 5799: Partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,7 @@ private static SystemVariable getMVSystemVariable(Map<String, String> properties
throw new AnalysisException("Analyze materialized properties failed " +
"because unknown properties: " + properties +
", please add `session.` prefix if you want add session variables for mv(" +
"eg, \"session.query_timeout\"=\"30000000\").");
"eg, \"session.insert_timeout\"=\"30000000\").");
}
String varKey = entry.getKey().substring(
PropertyAnalyzer.PROPERTIES_MATERIALIZED_VIEW_SESSION_PREFIX.length());
Expand Down
3 changes: 2 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/load/pipe/Pipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.starrocks.load.pipe.filelist.FileListRepo;
import com.starrocks.persist.gson.GsonPostProcessable;
import com.starrocks.persist.gson.GsonUtils;
import com.starrocks.qe.SessionVariable;
import com.starrocks.scheduler.Constants;
import com.starrocks.scheduler.ExecuteOption;
import com.starrocks.scheduler.SubmitResult;
Expand Down Expand Up @@ -83,7 +84,7 @@ public class Pipe implements GsonPostProcessable {

private static final ImmutableMap<String, String> DEFAULT_TASK_EXECUTION_VARIABLES =
ImmutableMap.<String, String>builder()
.put("query_timeout", "3600")
.put(SessionVariable.INSERT_TIMEOUT, "3600")
.build();

@SerializedName(value = "name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,16 @@ public class PartitionBasedMvRefreshProcessor extends BaseTaskRunProcessor {
private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);

// session.enable_spill
public static final String MV_SESSION_ENABLE_SPILL =
private static final String MV_SESSION_ENABLE_SPILL =
PropertyAnalyzer.PROPERTIES_MATERIALIZED_VIEW_SESSION_PREFIX + SessionVariable.ENABLE_SPILL;
// session.query_timeout
public static final String MV_SESSION_TIMEOUT =
// session.query_timeout. Deprecated, only for compatibility with old version
private static final String MV_SESSION_QUERY_TIMEOUT =
PropertyAnalyzer.PROPERTIES_MATERIALIZED_VIEW_SESSION_PREFIX + SessionVariable.QUERY_TIMEOUT;
// default query timeout for mv: 1 hour
private static final int MV_DEFAULT_QUERY_TIMEOUT = 3600;
// session.insert_timeout
private static final String MV_SESSION_INSERT_TIMEOUT =
PropertyAnalyzer.PROPERTIES_MATERIALIZED_VIEW_SESSION_PREFIX + SessionVariable.INSERT_TIMEOUT;
// default insert timeout for mv: 1 hour
private static final int MV_DEFAULT_INSERT_TIMEOUT = 3600;


private Database db;
Expand Down Expand Up @@ -580,9 +583,15 @@ private void changeDefaultConnectContextIfNeeded(ConnectContext mvConnectCtx) {
mvSessionVariable.setEnableSpill(true);
}

// change `query_timeout` to 1 hour by default for better user experience.
if (!mvProperty.getProperties().containsKey(MV_SESSION_TIMEOUT)) {
mvSessionVariable.setInsertTimeoutS(MV_DEFAULT_QUERY_TIMEOUT);
if (!mvProperty.getProperties().containsKey(MV_SESSION_INSERT_TIMEOUT)) {
if (!mvProperty.getProperties().containsKey(MV_SESSION_QUERY_TIMEOUT)) {
// change `insert_timeout` to 1 hour by default for better user experience.
mvSessionVariable.setInsertTimeoutS(MV_DEFAULT_INSERT_TIMEOUT);
} else {
// for compatibility
mvProperty.getProperties().put(MV_SESSION_INSERT_TIMEOUT,
mvProperty.getProperties().remove(MV_SESSION_QUERY_TIMEOUT));
}
}

// set insert_max_filter_ratio by default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ private static void analyzeSystemVariable(SystemVariable var) {
1L, (long) SessionVariable.MAX_QUERY_TIMEOUT);
}

if (variable.equalsIgnoreCase(SessionVariable.INSERT_TIMEOUT)) {
checkRangeLongVariable(resolvedExpression, SessionVariable.INSERT_TIMEOUT,
1L, (long) SessionVariable.MAX_QUERY_TIMEOUT);
}

if (variable.equalsIgnoreCase(SessionVariable.NEW_PLANNER_OPTIMIZER_TIMEOUT)) {
checkRangeLongVariable(resolvedExpression, SessionVariable.NEW_PLANNER_OPTIMIZER_TIMEOUT, 1L, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -957,18 +957,18 @@ public void testProperty() throws Exception {
public void testTaskProperties() throws Exception {
mockRepoExecutor();
String pipeName = "p_task_properties";
createPipe("create pipe p_task_properties properties('task.query_timeout'='20') " +
createPipe("create pipe p_task_properties properties('task.insert_timeout'='20') " +
" as insert into tbl1 select * from files('path'='fake://pipe', 'format'='parquet')");
Pipe pipe = getPipe(pipeName);
Assert.assertEquals("{\"task.query_timeout\":\"20\"}", pipe.getPropertiesJson());
Assert.assertEquals(ImmutableMap.of("query_timeout", "20"), pipe.getTaskProperties());
Assert.assertEquals("{\"task.insert_timeout\":\"20\"}", pipe.getPropertiesJson());
Assert.assertEquals(ImmutableMap.of("insert_timeout", "20"), pipe.getTaskProperties());
dropPipe(pipeName);

// default task execution variables
createPipe("create pipe p_task_properties " +
" as insert into tbl1 select * from files('path'='fake://pipe', 'format'='parquet')");
pipe = getPipe(pipeName);
Assert.assertEquals(ImmutableMap.of("query_timeout", "3600"), pipe.getTaskProperties());
Assert.assertEquals(ImmutableMap.of("insert_timeout", "3600"), pipe.getTaskProperties());
}

@Test
Expand Down
8 changes: 4 additions & 4 deletions test/sql/test_materialized_view/R/test_show_materialized_view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ AS SELECT `user_tags`.`user_id`, bitmap_union(to_bitmap(`user_tags`.`tag_id`)) A
FROM `test_show_materialized_view`.`user_tags`
GROUP BY `user_tags`.`user_id`;
-- !result
alter materialized view user_tags_mv1 set ("session.query_timeout" = "3600");
alter materialized view user_tags_mv1 set ("session.insert_timeout" = "3600");
-- result:
-- !result
alter materialized view user_tags_mv1 set ("mv_rewrite_staleness_second" = "3600");
Expand All @@ -57,7 +57,7 @@ PROPERTIES (
"replicated_storage" = "true",
"mv_rewrite_staleness_second" = "3600",
"replication_num" = "1",
"session.query_timeout" = "3600",
"session.insert_timeout" = "3600",
"storage_medium" = "HDD"
)
AS SELECT `user_tags`.`user_id`, bitmap_union(to_bitmap(`user_tags`.`tag_id`)) AS `bitmap_union(to_bitmap(tag_id))`
Expand All @@ -73,7 +73,7 @@ PROPERTIES (
"replicated_storage" = "true",
"mv_rewrite_staleness_second" = "3600",
"replication_num" = "1",
"session.query_timeout" = "3600",
"session.insert_timeout" = "3600",
"storage_medium" = "HDD"
)
AS SELECT `user_tags`.`user_id`, bitmap_union(to_bitmap(`user_tags`.`tag_id`)) AS `bitmap_union(to_bitmap(tag_id))`
Expand All @@ -82,4 +82,4 @@ GROUP BY `user_tags`.`user_id`;
-- !result
drop database test_show_materialized_view;
-- result:
-- !result
-- !result
4 changes: 2 additions & 2 deletions test/sql/test_materialized_view/T/test_show_materialized_view
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ create materialized view user_tags_mv1 distributed by hash(user_id) as select u

show create materialized view user_tags_mv1;
show create table user_tags_mv1;
alter materialized view user_tags_mv1 set ("session.query_timeout" = "3600");
alter materialized view user_tags_mv1 set ("session.insert_timeout" = "3600");
alter materialized view user_tags_mv1 set ("mv_rewrite_staleness_second" = "3600");
show create materialized view user_tags_mv1;
show create table user_tags_mv1;
drop database test_show_materialized_view;
drop database test_show_materialized_view;

0 comments on commit 3eaaffb

Please sign in to comment.