diff --git a/docs/content.zh/docs/dev/table/materialized-table/overview.md b/docs/content.zh/docs/dev/table/materialized-table/overview.md index 9ce5e9ba34f7d..d0499bcf1c921 100644 --- a/docs/content.zh/docs/dev/table/materialized-table/overview.md +++ b/docs/content.zh/docs/dev/table/materialized-table/overview.md @@ -34,7 +34,9 @@ under the License. ## 数据新鲜度 -数据新鲜度定义了物化表数据相对于基础表更新的最大滞后时间。它并非绝对保证,而是 Flink 尝试达到的目标。框架会尽力确保物化表中的数据在指定的新鲜度内刷新。 +数据新鲜度定义了物化表数据相对于基础表更新的最大滞后时间。它并非绝对保证,而是 Flink 尝试达到的目标。框架会尽力确保物化表中的数据在指定的新鲜度目标内刷新。 + +Data freshness is optional when creating a materialized table. If not specified, the system uses the default freshness based on the refresh mode: [materialized-table.default-freshness.continuous]({{< ref "docs/dev/table/config" >}}#materialized-table-default-freshness-continuous) (default: 3 minutes) for CONTINUOUS mode, or [materialized-table.default-freshness.full]({{< ref "docs/dev/table/config" >}}#materialized-table-default-freshness-full) (default: 1 hour) for FULL mode. 数据新鲜度是物化表的一个关键属性,具有两个主要作用: - **确定刷新模式**:目前有持续模式和全量模式。关于如何确定刷新模式的详细信息,请参阅 [materialized-table.refresh-mode.freshness-threshold]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold) 配置项。 @@ -59,4 +61,3 @@ under the License. ## Schema 物化表的 `Schema` 定义与普通表相同,可以声明主键和分区字段。其列名和类型会从相应的查询中推导,用户无法手动指定。 - diff --git a/docs/content.zh/docs/dev/table/materialized-table/statements.md b/docs/content.zh/docs/dev/table/materialized-table/statements.md index 6e0e641c96828..a27bce3093fcb 100644 --- a/docs/content.zh/docs/dev/table/materialized-table/statements.md +++ b/docs/content.zh/docs/dev/table/materialized-table/statements.md @@ -35,21 +35,21 @@ Flink SQL 目前支持以下物化表操作: ``` CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name - + [ ([ ]) ] - + [COMMENT table_comment] - + [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] - + [WITH (key1=val1, key2=val2, ...)] - + FRESHNESS = INTERVAL '' { SECOND | MINUTE | HOUR | DAY } - + [REFRESH_MODE = { CONTINUOUS | FULL }] - + AS - + : [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED ``` @@ -69,7 +69,7 @@ AS CREATE MATERIALIZED TABLE my_materialized_table PARTITIONED BY (ds) FRESHNESS = INTERVAL '1' HOUR - AS SELECT + AS SELECT ds FROM ... @@ -103,9 +103,11 @@ CREATE MATERIALIZED TABLE my_materialized_table `FRESHNESS` 用于指定物化表的数据新鲜度。 +`FRESHNESS` is optional. When omitted, the system uses the default freshness based on the refresh mode: `materialized-table.default-freshness.continuous` (default: 3 minutes) for CONTINUOUS mode, or `materialized-table.default-freshness.full` (default: 1 hour) for FULL mode. + **数据新鲜度与刷新模式关系** -数据新鲜度定义了物化表内容滞后于基础表更新的最长时间。它有两个作用,首先通过[配置]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold)确定物化表的[刷新模式]({{< ref "docs/dev/table/materialized-table/overview" >}}#刷新模式),然后确定数据刷新频率以满足实际数据新鲜度要求。 +数据新鲜度定义了物化表内容滞后于基础表更新的最长时间。When not specified, it uses the default value from configuration based on the refresh mode. 它有两个作用,首先通过[配置]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold)确定物化表的[刷新模式]({{< ref "docs/dev/table/materialized-table/overview" >}}#刷新模式),然后确定数据刷新频率以满足实际数据新鲜度要求。 **FRESHNESS 参数详解** @@ -128,6 +130,22 @@ FRESHNESS = INTERVAL '1' HOUR FRESHNESS = INTERVAL '1' DAY ``` +**Default FRESHNESS Example:** +(Assuming `materialized-table.default-freshness.continuous` is 3 minutes, `materialized-table.default-freshness.full` is 1 hour, and `materialized-table.refresh-mode.freshness-threshold` is 30 minutes) + +```sql +-- FRESHNESS is omitted, uses the configured default of 3 minutes for CONTINUOUS mode +-- The corresponding refresh pipeline is a streaming job with a checkpoint interval of 3 minutes +CREATE MATERIALIZED TABLE my_materialized_table + AS SELECT * FROM source_table; + +-- FRESHNESS is omitted and FULL mode is explicitly specified, uses the configured default of 1 hour +-- The corresponding refresh pipeline is a scheduled workflow with a schedule cycle of 1 hour +CREATE MATERIALIZED TABLE my_materialized_table_full + REFRESH_MODE = FULL + AS SELECT * FROM source_table; +``` + **不合法的 `FRESHNESS` 示例:** ```sql @@ -147,6 +165,7 @@ FRESHNESS = INTERVAL '5' HOUR ``` 注意 +- If FRESHNESS is not specified, the table will use the default freshness interval based on the refresh mode: `materialized-table.default-freshness.continuous` (default: 3 minutes) for CONTINUOUS mode, or `materialized-table.default-freshness.full` (default: 1 hour) for FULL mode. - 尽管物化表数据将尽可能在定义的新鲜度内刷新,但不能保证完全满足新鲜度要求。 - 在持续模式下,数据新鲜度和 `checkpoint` 间隔一致,设置过短的数据新鲜度可能会对作业性能产生影响。此外,为了优化 `checkpoint` 性能,建议[开启 Changelog]({{< ref "docs/ops/state/state_backends" >}}#开启-changelog)。 - 在全量模式下,数据新鲜度会转换为 `cron` 表达式,因此目前仅支持在预定义时间间隔单位内的新鲜度间隔,这种设计确保了与 `cron` 表达式语义的一致性。具体支持以下新鲜度间隔: @@ -168,14 +187,14 @@ CREATE MATERIALIZED TABLE my_materialized_table FRESHNESS = INTERVAL '1' HOUR REFRESH_MODE = CONTINUOUS AS SELECT - ... + ... -- 创建的物化表的刷新模式为全量模式,作业的调度周期为 10 分钟。 CREATE MATERIALIZED TABLE my_materialized_table FRESHNESS = INTERVAL '10' MINUTE REFRESH_MODE = FULL AS SELECT - ... + ... ``` ## AS @@ -204,22 +223,22 @@ CREATE MATERIALIZED TABLE my_materialized_table_continuous 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd' ) FRESHNESS = INTERVAL '10' SECOND - AS - SELECT + AS + SELECT k.ds, k.user_id, COUNT(*) AS event_count, SUM(k.amount) AS total_amount, MAX(u.age) AS max_age - FROM + FROM kafka_catalog.db1.kafka_table k - JOIN + JOIN user_catalog.db1.user_table u - ON + ON k.user_id = u.user_id - WHERE + WHERE k.event_type = 'purchase' - GROUP BY + GROUP BY k.ds, k.user_id ``` @@ -233,22 +252,22 @@ CREATE MATERIALIZED TABLE my_materialized_table_full 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd' ) FRESHNESS = INTERVAL '1' HOUR - AS - SELECT + AS + SELECT p.ds, p.product_id, p.product_name, AVG(s.sale_price) AS avg_sale_price, SUM(s.quantity) AS total_quantity - FROM + FROM paimon_catalog.db1.product_table p - LEFT JOIN + LEFT JOIN paimon_catalog.db1.sales_table s - ON + ON p.product_id = s.product_id - WHERE + WHERE p.category = 'electronics' - GROUP BY + GROUP BY p.ds, p.product_id, p.product_name ``` @@ -276,7 +295,7 @@ ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND `SUSPEND` 用于暂停物化表的后台刷新管道。 -**示例:** +**示例:** ```sql -- 暂停前指定 SAVEPOINT 路径 @@ -297,7 +316,7 @@ ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name RESUME [WITH (key1= `RESUME` 用于恢复物化表的刷新管道。在恢复时,可以通过 `WITH` 子句动态指定物化表的参数,该参数仅对当前恢复的刷新管道生效,并不会持久化到物化表中。 -**示例:** +**示例:** ```sql -- 恢复指定的物化表 @@ -358,21 +377,21 @@ ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS }}#materialized-table-default-freshness-continuous) (default: 3 minutes) for CONTINUOUS mode, or [materialized-table.default-freshness.full]({{< ref "docs/dev/table/config" >}}#materialized-table-default-freshness-full) (default: 1 hour) for FULL mode. Data freshness is a crucial attribute of a materialized table, serving two main purposes: - **Determining the Refresh Mode**. Currently, there are CONTINUOUS and FULL modes. For details on how to determine the refresh mode, refer to the [materialized-table.refresh-mode.freshness-threshold]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold) configuration item. diff --git a/docs/content/docs/dev/table/materialized-table/statements.md b/docs/content/docs/dev/table/materialized-table/statements.md index a5c7745397747..07784bb3ede00 100644 --- a/docs/content/docs/dev/table/materialized-table/statements.md +++ b/docs/content/docs/dev/table/materialized-table/statements.md @@ -44,7 +44,7 @@ CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name [WITH (key1=val1, key2=val2, ...)] -FRESHNESS = INTERVAL '' { SECOND | MINUTE | HOUR | DAY } +[FRESHNESS = INTERVAL '' { SECOND[S] | MINUTE[S] | HOUR[S] | DAY[S] }] [REFRESH_MODE = { CONTINUOUS | FULL }] @@ -69,7 +69,7 @@ AS CREATE MATERIALIZED TABLE my_materialized_table PARTITIONED BY (ds) FRESHNESS = INTERVAL '1' HOUR - AS SELECT + AS SELECT ds FROM ... @@ -103,9 +103,11 @@ As shown in the above example, we specified the date-formatter option for the `d `FRESHNESS` defines the data freshness of a materialized table. +`FRESHNESS` is optional. When omitted, the system uses the default freshness based on the refresh mode: `materialized-table.default-freshness.continuous` (default: 3 minutes) for CONTINUOUS mode, or `materialized-table.default-freshness.full` (default: 1 hour) for FULL mode. + **FRESHNESS and Refresh Mode Relationship** -FRESHNESS defines the maximum amount of time that the materialized table’s content should lag behind updates to the base tables. It does two things, firstly it determines the [refresh mode]({{< ref "docs/dev/table/materialized-table/overview" >}}#refresh-mode) of the materialized table through [configuration]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold), followed by determines the data refresh frequency to meet the actual data freshness requirements. +FRESHNESS defines the maximum amount of time that the materialized table's content should lag behind updates to the base tables. When not specified, it uses the default value from configuration based on the refresh mode. It does two things: firstly it determines the [refresh mode]({{< ref "docs/dev/table/materialized-table/overview" >}}#refresh-mode) of the materialized table through [configuration]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold), followed by determining the data refresh frequency to meet the actual data freshness requirements. **Explanation of FRESHNESS Parameter** @@ -128,6 +130,22 @@ FRESHNESS = INTERVAL '1' HOUR FRESHNESS = INTERVAL '1' DAY ``` +**Default FRESHNESS Example:** +(Assuming `materialized-table.default-freshness.continuous` is 3 minutes, `materialized-table.default-freshness.full` is 1 hour, and `materialized-table.refresh-mode.freshness-threshold` is 30 minutes) + +```sql +-- FRESHNESS is omitted, uses the configured default of 3 minutes for CONTINUOUS mode +-- The corresponding refresh pipeline is a streaming job with a checkpoint interval of 3 minutes +CREATE MATERIALIZED TABLE my_materialized_table + AS SELECT * FROM source_table; + +-- FRESHNESS is omitted and FULL mode is explicitly specified, uses the configured default of 1 hour +-- The corresponding refresh pipeline is a scheduled workflow with a schedule cycle of 1 hour +CREATE MATERIALIZED TABLE my_materialized_table_full + REFRESH_MODE = FULL + AS SELECT * FROM source_table; +``` + **Invalid `FRESHNESS` Examples:** ```sql @@ -147,6 +165,7 @@ FRESHNESS = INTERVAL '5' HOUR ``` Note +- If FRESHNESS is not specified, the table will use the default freshness interval based on the refresh mode: `materialized-table.default-freshness.continuous` (default: 3 minutes) for CONTINUOUS mode, or `materialized-table.default-freshness.full` (default: 1 hour) for FULL mode. - The materialized table data will be refreshed as closely as possible within the defined freshness but cannot guarantee complete satisfaction. - In CONTINUOUS mode, setting a data freshness interval that is too short can impact job performance as it aligns with the checkpoint interval. To optimize checkpoint performance, consider [enabling-changelog]({{< ref "docs/ops/state/state_backends" >}}#incremental-checkpoints). - In FULL mode, data freshness must be translated into a cron expression, consequently, only freshness intervals within predefined time spans are presently accommodated, this design ensures alignment with cron's capabilities. Specifically, support for the following freshness: @@ -168,14 +187,14 @@ CREATE MATERIALIZED TABLE my_materialized_table FRESHNESS = INTERVAL '1' HOUR REFRESH_MODE = CONTINUOUS AS SELECT - ... + ... -- The refresh mode of the created materialized table is FULL, and the job's schedule cycle is 10 minutes. CREATE MATERIALIZED TABLE my_materialized_table FRESHNESS = INTERVAL '10' MINUTE REFRESH_MODE = FULL AS SELECT - ... + ... ``` ## AS @@ -204,22 +223,21 @@ CREATE MATERIALIZED TABLE my_materialized_table_continuous 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd' ) FRESHNESS = INTERVAL '10' SECOND - AS - SELECT + AS SELECT k.ds, k.user_id, COUNT(*) AS event_count, SUM(k.amount) AS total_amount, MAX(u.age) AS max_age - FROM + FROM kafka_catalog.db1.kafka_table k - JOIN + JOIN user_catalog.db1.user_table u - ON + ON k.user_id = u.user_id - WHERE + WHERE k.event_type = 'purchase' - GROUP BY + GROUP BY k.ds, k.user_id ``` @@ -233,22 +251,21 @@ CREATE MATERIALIZED TABLE my_materialized_table_full 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd' ) FRESHNESS = INTERVAL '1' HOUR - AS - SELECT + AS SELECT p.ds, p.product_id, p.product_name, AVG(s.sale_price) AS avg_sale_price, SUM(s.quantity) AS total_quantity - FROM + FROM paimon_catalog.db1.product_table p - LEFT JOIN + LEFT JOIN paimon_catalog.db1.sales_table s - ON + ON p.product_id = s.product_id - WHERE + WHERE p.category = 'electronics' - GROUP BY + GROUP BY p.ds, p.product_id, p.product_name ``` diff --git a/docs/layouts/shortcodes/generated/materialized_table_config_configuration.html b/docs/layouts/shortcodes/generated/materialized_table_config_configuration.html index d5829bf3224e7..f3cb3972a730d 100644 --- a/docs/layouts/shortcodes/generated/materialized_table_config_configuration.html +++ b/docs/layouts/shortcodes/generated/materialized_table_config_configuration.html @@ -8,6 +8,18 @@ + +
materialized-table.default-freshness.continuous

Batch Streaming + 3 min + Duration + The default freshness interval for continuous refresh mode when the FRESHNESS clause is omitted in a materialized table definition. + + +
materialized-table.default-freshness.full

Batch Streaming + 1 h + Duration + The default freshness interval for full refresh mode when the FRESHNESS clause is omitted in a materialized table definition. +
materialized-table.refresh-mode.freshness-threshold

Batch Streaming 30 min diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java index 323e0ca17b757..bafb36507117c 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java @@ -83,6 +83,7 @@ import java.io.IOException; import java.net.URLClassLoader; +import java.time.Duration; import java.time.ZoneId; import java.util.ArrayList; import java.util.Collections; @@ -108,13 +109,12 @@ import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.SCHEDULE_TIME_DATE_FORMATTER_DEFAULT; import static org.apache.flink.table.api.internal.TableResultInternal.TABLE_RESULT_OK; import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE; +import static org.apache.flink.table.catalog.IntervalFreshness.convertFreshnessToCron; import static org.apache.flink.table.factories.WorkflowSchedulerFactoryUtil.WORKFLOW_SCHEDULER_PREFIX; import static org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.getEndpointConfig; import static org.apache.flink.table.gateway.service.utils.Constants.CLUSTER_INFO; import static org.apache.flink.table.gateway.service.utils.Constants.JOB_ID; import static org.apache.flink.table.utils.DateTimeUtils.formatTimestampStringWithOffset; -import static org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToCron; -import static org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToDuration; /** Manager is responsible for execute the {@link MaterializedTableOperation}. */ @Internal @@ -166,8 +166,7 @@ public void close() throws Exception { public ResultFetcher callMaterializedTableOperation( OperationExecutor operationExecutor, OperationHandle handle, - MaterializedTableOperation op, - String statement) { + MaterializedTableOperation op) { if (op instanceof CreateMaterializedTableOperation) { return callCreateMaterializedTableOperation( operationExecutor, handle, (CreateMaterializedTableOperation) op); @@ -260,9 +259,8 @@ private void createMaterializedTableInFullMode( ResolvedCatalogMaterializedTable catalogMaterializedTable = createMaterializedTableOperation.getCatalogMaterializedTable(); - // convert duration to cron expression - String cronExpression = - convertFreshnessToCron(catalogMaterializedTable.getDefinitionFreshness()); + final IntervalFreshness freshness = catalogMaterializedTable.getDefinitionFreshness(); + String cronExpression = convertFreshnessToCron(freshness); // create full refresh job CreateRefreshWorkflow createRefreshWorkflow = new CreatePeriodicRefreshWorkflow( @@ -306,7 +304,7 @@ private ResultFetcher callAlterMaterializedTableSuspend( OperationHandle handle, AlterMaterializedTableSuspendOperation op) { ObjectIdentifier tableIdentifier = op.getTableIdentifier(); - CatalogMaterializedTable materializedTable = + ResolvedCatalogMaterializedTable materializedTable = getCatalogMaterializedTable(operationExecutor, tableIdentifier); // Initialization phase doesn't support resume operation. @@ -420,7 +418,7 @@ private ResultFetcher callAlterMaterializedTableResume( OperationHandle handle, AlterMaterializedTableResumeOperation op) { ObjectIdentifier tableIdentifier = op.getTableIdentifier(); - CatalogMaterializedTable catalogMaterializedTable = + ResolvedCatalogMaterializedTable catalogMaterializedTable = getCatalogMaterializedTable(operationExecutor, tableIdentifier); // Initialization phase doesn't support resume operation. @@ -456,7 +454,7 @@ private void resumeContinuousRefreshJob( OperationExecutor operationExecutor, OperationHandle handle, ObjectIdentifier tableIdentifier, - CatalogMaterializedTable catalogMaterializedTable, + ResolvedCatalogMaterializedTable catalogMaterializedTable, Map dynamicOptions) { ContinuousRefreshHandler refreshHandler = deserializeContinuousHandler( @@ -562,9 +560,10 @@ private void executeContinuousRefreshJob( .getSessionContext() .getSessionConf() .contains(CheckpointingOptions.CHECKPOINTING_INTERVAL)) { - customConfig.set( - CheckpointingOptions.CHECKPOINTING_INTERVAL, - catalogMaterializedTable.getFreshness()); + + final Duration freshness = + validateAndGetIntervalFreshness(catalogMaterializedTable).toDuration(); + customConfig.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, freshness); } String insertStatement = @@ -727,7 +726,7 @@ static Map getPeriodRefreshPartition( SCHEDULE_TIME_DATE_FORMATTER_DEFAULT, partFieldFormatter, TimeZone.getTimeZone(localZoneId), - -convertFreshnessToDuration(freshness).toMillis()); + -freshness.toDuration().toMillis()); if (partFiledValue == null) { throw new SqlExecutionException( String.format( @@ -818,7 +817,7 @@ private ResultFetcher callAlterMaterializedTableChangeOperation( OperationHandle handle, AlterMaterializedTableChangeOperation op) { ObjectIdentifier tableIdentifier = op.getTableIdentifier(); - CatalogMaterializedTable oldMaterializedTable = + ResolvedCatalogMaterializedTable oldMaterializedTable = getCatalogMaterializedTable(operationExecutor, tableIdentifier); if (CatalogMaterializedTable.RefreshMode.FULL == oldMaterializedTable.getRefreshMode()) { @@ -995,12 +994,12 @@ private ResultFetcher callDropMaterializedTableOperation( } } - CatalogMaterializedTable materializedTable = + ResolvedCatalogMaterializedTable materializedTable = getCatalogMaterializedTable(operationExecutor, tableIdentifier); - CatalogMaterializedTable.RefreshMode refreshMode = materializedTable.getRefreshMode(); CatalogMaterializedTable.RefreshStatus refreshStatus = materializedTable.getRefreshStatus(); if (CatalogMaterializedTable.RefreshStatus.ACTIVATED == refreshStatus || CatalogMaterializedTable.RefreshStatus.SUSPENDED == refreshStatus) { + CatalogMaterializedTable.RefreshMode refreshMode = materializedTable.getRefreshMode(); if (CatalogMaterializedTable.RefreshMode.FULL == refreshMode) { deleteRefreshWorkflow(tableIdentifier, materializedTable); } else if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == refreshMode @@ -1008,8 +1007,7 @@ private ResultFetcher callDropMaterializedTableOperation( cancelContinuousRefreshJob( operationExecutor, handle, tableIdentifier, materializedTable); } - } else if (CatalogMaterializedTable.RefreshStatus.INITIALIZING - == materializedTable.getRefreshStatus()) { + } else if (CatalogMaterializedTable.RefreshStatus.INITIALIZING == refreshStatus) { throw new ValidationException( String.format( "Current refresh status of materialized table %s is initializing, skip the drop operation.", @@ -1375,4 +1373,10 @@ private static Optional getClusterIdKeyName(String targetName) { return Optional.empty(); } } + + private static IntervalFreshness validateAndGetIntervalFreshness( + final CatalogMaterializedTable catalogMaterializedTable) { + return Optional.ofNullable(catalogMaterializedTable.getDefinitionFreshness()) + .orElseThrow(() -> new SqlExecutionException("Freshness cannot be null")); + } } diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java index acf5ac7d77205..5735abd6dee31 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java @@ -556,8 +556,7 @@ private ResultFetcher executeOperation( return sessionContext .getSessionState() .materializedTableManager - .callMaterializedTableOperation( - this, handle, (MaterializedTableOperation) op, statement); + .callMaterializedTableOperation(this, handle, (MaterializedTableOperation) op); } else { return callOperation(tableEnv, handle, op); } diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java index 2e2499e75ae59..de73723176bdb 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java @@ -37,6 +37,7 @@ import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; @@ -142,7 +143,6 @@ void testCreateMaterializedTableInContinuousMode() throws Exception { Column.physical("pv", DataTypes.INT().notNull()))); assertThat(actualMaterializedTable.getResolvedSchema()).isEqualTo(expectedSchema); - assertThat(actualMaterializedTable.getFreshness()).isEqualTo(Duration.ofSeconds(30)); assertThat(actualMaterializedTable.getLogicalRefreshMode()) .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC); assertThat(actualMaterializedTable.getRefreshMode()) @@ -178,6 +178,191 @@ void testCreateMaterializedTableInContinuousMode() throws Exception { ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); } + @Test + void testCreateMaterializedTableInFullModeWithoutFreshness() throws Exception { + String materializedTableDDL = + "CREATE MATERIALIZED TABLE users_shops" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " REFRESH_MODE = FULL\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + + " SUM (1) AS pv\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + + // validate materialized table: schema, refresh mode, refresh status, refresh handler, + // doesn't check the data because it generates randomly. + ResolvedCatalogMaterializedTable actualMaterializedTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + // Expected schema + ResolvedSchema expectedSchema = + ResolvedSchema.of( + Arrays.asList( + Column.physical("user_id", DataTypes.BIGINT()), + Column.physical("shop_id", DataTypes.BIGINT()), + Column.physical("ds", DataTypes.STRING()), + Column.physical("payed_buy_fee_sum", DataTypes.BIGINT()), + Column.physical("pv", DataTypes.INT().notNull()))); + + assertThat(actualMaterializedTable.getResolvedSchema()).isEqualTo(expectedSchema); + assertThat(actualMaterializedTable.getLogicalRefreshMode()) + .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.FULL); + assertThat(actualMaterializedTable.getRefreshMode()).isEqualTo(RefreshMode.FULL); + assertThat(actualMaterializedTable.getRefreshStatus()) + .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED); + assertThat(actualMaterializedTable.getRefreshHandlerDescription()).isNotEmpty(); + assertThat(actualMaterializedTable.getSerializedRefreshHandler()).isNotEmpty(); + + // verify refresh handler + byte[] serializedHandler = actualMaterializedTable.getSerializedRefreshHandler(); + EmbeddedRefreshHandler embeddedRefreshHandler = + EmbeddedRefreshHandlerSerializer.INSTANCE.deserialize( + serializedHandler, getClass().getClassLoader()); + assertThat(embeddedRefreshHandler.getWorkflowName()) + .isEqualTo( + "quartz_job_" + + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops") + .asSerializableString()); + + EmbeddedQuartzScheduler embeddedWorkflowScheduler = + SQL_GATEWAY_REST_ENDPOINT_EXTENSION + .getSqlGatewayRestEndpoint() + .getQuartzScheduler(); + JobKey jobKey = + new JobKey( + embeddedRefreshHandler.getWorkflowName(), + embeddedRefreshHandler.getWorkflowGroup()); + + // verify the job is created + assertThat(embeddedWorkflowScheduler.getQuartzScheduler().checkExists(jobKey)).isTrue(); + + // verify initialization conf + JobDetail jobDetail = embeddedWorkflowScheduler.getQuartzScheduler().getJobDetail(jobKey); + String workflowJsonStr = jobDetail.getJobDataMap().getString(WORKFLOW_INFO); + WorkflowInfo workflowInfo = fromJson(workflowJsonStr, WorkflowInfo.class); + assertThat(workflowInfo.getInitConfig()) + .containsEntry("k1", "v1") + .containsEntry("k2", "v2") + .containsKey("sql-gateway.endpoint.rest.address") + .containsKey("sql-gateway.endpoint.rest.port") + .containsKey("table.catalog-store.kind") + .containsKey("table.catalog-store.file.path") + .doesNotContainKey(WORKFLOW_SCHEDULER_TYPE.key()) + .doesNotContainKey(RESOURCES_DOWNLOAD_DIR.key()); + + // drop the materialized table + dropMaterializedTable( + ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); + } + + @Test + void testCreateMaterializedTableInContinuousModeWithoutFreshnessAndRefreshMode() + throws Exception { + String materializedTableDDL = + "CREATE MATERIALIZED TABLE users_shops" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + + " SUM (1) AS pv\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + + // validate materialized table: schema, refresh mode, refresh status, refresh handler, + // doesn't check the data because it generates randomly. + ResolvedCatalogMaterializedTable actualMaterializedTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + // Expected schema + ResolvedSchema expectedSchema = + ResolvedSchema.of( + Arrays.asList( + Column.physical("user_id", DataTypes.BIGINT()), + Column.physical("shop_id", DataTypes.BIGINT()), + Column.physical("ds", DataTypes.STRING()), + Column.physical("payed_buy_fee_sum", DataTypes.BIGINT()), + Column.physical("pv", DataTypes.INT().notNull()))); + + assertThat(actualMaterializedTable.getResolvedSchema()).isEqualTo(expectedSchema); + assertThat(actualMaterializedTable.getDefinitionFreshness()) + .isEqualTo(IntervalFreshness.ofMinute("3")); + assertThat(actualMaterializedTable.getLogicalRefreshMode()) + .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC); + assertThat(actualMaterializedTable.getRefreshMode()) + .isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS); + assertThat(actualMaterializedTable.getRefreshStatus()) + .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED); + assertThat(actualMaterializedTable.getRefreshHandlerDescription()).isNotEmpty(); + assertThat(actualMaterializedTable.getSerializedRefreshHandler()).isNotEmpty(); + + ContinuousRefreshHandler activeRefreshHandler = + ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( + actualMaterializedTable.getSerializedRefreshHandler(), + getClass().getClassLoader()); + + waitUntilAllTasksAreRunning( + restClusterClient, JobID.fromHexString(activeRefreshHandler.getJobId())); + + // verify the background job is running + String describeJobDDL = String.format("DESCRIBE JOB '%s'", activeRefreshHandler.getJobId()); + OperationHandle describeJobHandle = + service.executeStatement(sessionHandle, describeJobDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, describeJobHandle); + List jobResults = fetchAllResults(service, sessionHandle, describeJobHandle); + assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING"); + + // get checkpoint interval + long checkpointInterval = + getCheckpointIntervalConfig(restClusterClient, activeRefreshHandler.getJobId()); + + // default checkpoint interval is 3 minutes + final int expectedCheckpointInterval = 60 * 3 * 1000; + assertThat(checkpointInterval).isEqualTo(expectedCheckpointInterval); + + // drop the materialized table + dropMaterializedTable( + ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); + } + @Test void testCreateMaterializedTableInContinuousModeWithCustomCheckpointInterval() throws Exception { @@ -348,7 +533,7 @@ void testCreateMaterializedTableInFullMode() throws Exception { } @Test - void testCreateMaterializedTableFailedInInContinuousMode() throws Exception { + void testCreateMaterializedTableFailedInInContinuousMode() { // create a materialized table with invalid SQL String materializedTableDDL = "CREATE MATERIALIZED TABLE users_shops" diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 5717da2e51f94..9200c1dc2ac33 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -1907,16 +1907,18 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean replace, boolean isTemporar propertyList = Properties() ] - - freshness = Expression(ExprContext.ACCEPT_NON_QUERY) - { - if (!(freshness instanceof SqlIntervalLiteral)) + [ + + freshness = Expression(ExprContext.ACCEPT_NON_QUERY) { - throw SqlUtil.newContextException( - getPos(), - ParserResource.RESOURCE.unsupportedFreshnessType()); + if (!(freshness instanceof SqlIntervalLiteral)) + { + throw SqlUtil.newContextException( + getPos(), + ParserResource.RESOURCE.unsupportedFreshnessType()); + } } - } + ] [ ( diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java index 5a9245c5dd02d..8c7fe2d508943 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java @@ -61,7 +61,7 @@ public class SqlCreateMaterializedTable extends SqlCreate { private final SqlNodeList propertyList; - private final SqlIntervalLiteral freshness; + private final @Nullable SqlIntervalLiteral freshness; private final @Nullable SqlLiteral refreshMode; @@ -75,7 +75,7 @@ public SqlCreateMaterializedTable( @Nullable SqlDistribution distribution, SqlNodeList partitionKeyList, SqlNodeList propertyList, - SqlIntervalLiteral freshness, + @Nullable SqlIntervalLiteral freshness, @Nullable SqlLiteral refreshMode, SqlNode asQuery) { super(OPERATOR, pos, false, false); @@ -86,7 +86,7 @@ public SqlCreateMaterializedTable( this.partitionKeyList = requireNonNull(partitionKeyList, "partitionKeyList should not be null"); this.propertyList = requireNonNull(propertyList, "propertyList should not be null"); - this.freshness = requireNonNull(freshness, "freshness should not be null"); + this.freshness = freshness; this.refreshMode = refreshMode; this.asQuery = requireNonNull(asQuery, "asQuery should not be null"); } @@ -136,12 +136,14 @@ public SqlNodeList getPropertyList() { return propertyList; } + @Nullable public SqlIntervalLiteral getFreshness() { return freshness; } - public Optional getRefreshMode() { - return Optional.ofNullable(refreshMode); + @Nullable + public SqlLiteral getRefreshMode() { + return refreshMode; } public SqlNode getAsQuery() { @@ -195,10 +197,12 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.endList(withFrame); } - writer.newlineAndIndent(); - writer.keyword("FRESHNESS"); - writer.keyword("="); - freshness.unparse(writer, leftPrec, rightPrec); + if (freshness != null) { + writer.newlineAndIndent(); + writer.keyword("FRESHNESS"); + writer.keyword("="); + freshness.unparse(writer, leftPrec, rightPrec); + } if (refreshMode != null) { writer.newlineAndIndent(); diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java index 0e58ffbb54669..d089ecc807921 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java @@ -63,26 +63,6 @@ void testCreateMaterializedTableWithUnsupportedFreshnessInterval() { + "AS SELECT a, b, h, t m FROM source"; sql(sql).fails( "MATERIALIZED TABLE only supports define interval type FRESHNESS, please refer to the materialized table document."); - - final String sql2 = - "CREATE MATERIALIZED TABLE tbl1\n" - + "(\n" - + " PRIMARY KEY (a, b)\n" - + ")\n" - + "COMMENT 'table comment'\n" - + "PARTITIONED BY (a, h)\n" - + "WITH (\n" - + " 'group.id' = 'latest', \n" - + " 'kafka.topic' = 'log.test'\n" - + ")\n" - + "^AS^ SELECT a, b, h, t m FROM source"; - - sql(sql2) - .fails( - "Encountered \"AS\" at line 11, column 1.\n" - + "Was expecting:\n" - + " \"FRESHNESS\" ...\n" - + " "); } @Test @@ -395,7 +375,8 @@ private static Stream inputForCreateMaterializedTable() { return Stream.of( Arguments.of(fullExample()), Arguments.of(withoutTableConstraint()), - Arguments.of(withPrimaryKey())); + Arguments.of(withPrimaryKey()), + Arguments.of(withoutFreshness())); } private static Map.Entry fullExample() { @@ -411,7 +392,7 @@ private static Map.Entry fullExample() { + " 'group.id' = 'latest', \n" + " 'kafka.topic' = 'log.test'\n" + ")\n" - + "FRESHNESS = INTERVAL '3' MINUTE\n" + + "FRESHNESS = INTERVAL '3' MINUTES\n" + "AS SELECT a, b, h, t m FROM source", "CREATE MATERIALIZED TABLE `TBL1`\n" + "(\n" @@ -456,7 +437,7 @@ private static Map.Entry withoutTableConstraint() { return new AbstractMap.SimpleEntry<>( "CREATE MATERIALIZED TABLE tbl1\n" + "COMMENT 'table comment'\n" - + "FRESHNESS = INTERVAL '3' DAY\n" + + "FRESHNESS = INTERVAL '3' DAYS\n" + "REFRESH_MODE = FULL\n" + "AS SELECT a, b, h, t m FROM source", "CREATE MATERIALIZED TABLE `TBL1`\n" @@ -467,4 +448,28 @@ private static Map.Entry withoutTableConstraint() { + "SELECT `A`, `B`, `H`, `T` AS `M`\n" + "FROM `SOURCE`"); } + + private static Map.Entry withoutFreshness() { + return new AbstractMap.SimpleEntry<>( + "CREATE MATERIALIZED TABLE tbl1\n" + + "(\n" + + " PRIMARY KEY (a, b)\n" + + ")\n" + + "WITH (\n" + + " 'group.id' = 'latest', \n" + + " 'kafka.topic' = 'log.test'\n" + + ")\n" + + "AS SELECT a, b, h, t m FROM source", + "CREATE MATERIALIZED TABLE `TBL1`\n" + + "(\n" + + " PRIMARY KEY (`A`, `B`)\n" + + ")\n" + + "WITH (\n" + + " 'group.id' = 'latest',\n" + + " 'kafka.topic' = 'log.test'\n" + + ")\n" + + "AS\n" + + "SELECT `A`, `B`, `H`, `T` AS `M`\n" + + "FROM `SOURCE`"); + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MaterializedTableConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MaterializedTableConfigOptions.java index 56cebedd191e8..e387705e1a3a0 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MaterializedTableConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/MaterializedTableConfigOptions.java @@ -58,4 +58,20 @@ private MaterializedTableConfigOptions() {} .withDescription( "Specifies the time partition formatter for the partitioned materialized table, where '#' denotes a string-based partition field name." + " This serves as a hint to the framework regarding which partition to refresh in full refresh mode."); + + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption MATERIALIZED_TABLE_DEFAULT_FRESHNESS_CONTINUOUS = + key("materialized-table.default-freshness.continuous") + .durationType() + .defaultValue(Duration.ofMinutes(3)) + .withDescription( + "The default freshness interval for continuous refresh mode when the FRESHNESS clause is omitted in a materialized table definition."); + + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption MATERIALIZED_TABLE_DEFAULT_FRESHNESS_FULL = + key("materialized-table.default-freshness.full") + .durationType() + .defaultValue(Duration.ofHours(1)) + .withDescription( + "The default freshness interval for full refresh mode when the FRESHNESS clause is omitted in a materialized table definition."); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index 24ebe07e9eaac..bc8708578073c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -27,8 +27,10 @@ import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.config.MaterializedTableConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.catalog.CatalogBaseTable.TableKind; +import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; @@ -65,6 +67,7 @@ import javax.annotation.Nullable; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -83,6 +86,7 @@ import java.util.stream.Stream; import static java.lang.String.format; +import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.MATERIALIZED_TABLE_FRESHNESS_THRESHOLD; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -124,13 +128,16 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { private SqlFactory sqlFactory; + private final MaterializedTableEnricher materializedTableEnricher; + private CatalogManager( String defaultCatalogName, Catalog defaultCatalog, DataTypeFactory typeFactory, List catalogModificationListeners, CatalogStoreHolder catalogStoreHolder, - SqlFactory sqlFactory) { + SqlFactory sqlFactory, + MaterializedTableEnricher materializedTableEnricher) { checkArgument( !StringUtils.isNullOrWhitespaceOnly(defaultCatalogName), "Default catalog name cannot be null or empty"); @@ -153,6 +160,8 @@ private CatalogManager( this.catalogStoreHolder = catalogStoreHolder; this.sqlFactory = sqlFactory; + this.materializedTableEnricher = + checkNotNull(materializedTableEnricher, "MaterializedTableEnricher cannot be null"); } @VisibleForTesting @@ -190,6 +199,8 @@ public static final class Builder { private SqlFactory sqlFactory = DefaultSqlFactory.INSTANCE; + private MaterializedTableEnricher materializedTableEnricher; + public Builder classLoader(ClassLoader classLoader) { this.classLoader = classLoader; return this; @@ -232,6 +243,12 @@ public Builder sqlFactory(SqlFactory sqlFactory) { return this; } + public Builder materializedTableEnricher( + MaterializedTableEnricher materializedTableEnricher) { + this.materializedTableEnricher = materializedTableEnricher; + return this; + } + public CatalogManager build() { checkNotNull(classLoader, "Class loader cannot be null"); checkNotNull(config, "Config cannot be null"); @@ -249,7 +266,29 @@ public CatalogManager build() { : executionConfig.getSerializerConfig()), catalogModificationListeners, catalogStoreHolder, - sqlFactory); + sqlFactory, + materializedTableEnricher != null + ? materializedTableEnricher + : createDefaultMaterializedTableEnricher()); + } + + private MaterializedTableEnricher createDefaultMaterializedTableEnricher() { + final Duration defaultDurationContinuous = + catalogStoreHolder + .config() + .get( + MaterializedTableConfigOptions + .MATERIALIZED_TABLE_DEFAULT_FRESHNESS_CONTINUOUS); + final Duration defaultDurationFull = + catalogStoreHolder + .config() + .get( + MaterializedTableConfigOptions + .MATERIALIZED_TABLE_DEFAULT_FRESHNESS_FULL); + final Duration freshnessThreshold = + catalogStoreHolder.config().get(MATERIALIZED_TABLE_FRESHNESS_THRESHOLD); + return DefaultMaterializedTableEnricher.create( + defaultDurationContinuous, defaultDurationFull, freshnessThreshold); } } @@ -1869,6 +1908,11 @@ public ResolvedCatalogMaterializedTable resolveCatalogMaterializedTable( final ResolvedSchema resolvedSchema = table.getUnresolvedSchema().resolve(schemaResolver); + final MaterializedTableEnrichmentResult enrichmentResult = + this.materializedTableEnricher.enrich(table); + IntervalFreshness freshness = enrichmentResult.getFreshness(); + RefreshMode resolvedRefreshMode = enrichmentResult.getRefreshMode(); + // Validate partition keys are included in physical columns final List physicalColumns = resolvedSchema.getColumns().stream() @@ -1888,7 +1932,8 @@ public ResolvedCatalogMaterializedTable resolveCatalogMaterializedTable( } }); - return new ResolvedCatalogMaterializedTable(table, resolvedSchema); + return new ResolvedCatalogMaterializedTable( + table, resolvedSchema, resolvedRefreshMode, freshness); } /** Resolves a {@link CatalogView} to a validated {@link ResolvedCatalogView}. */ diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java index 872dbb4ccd784..f567658c7c14d 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java @@ -386,6 +386,8 @@ private static ResolvedCatalogMaterializedTable createResolvedMaterialized( .logicalRefreshMode(LogicalRefreshMode.AUTOMATIC) .refreshStatus(RefreshStatus.ACTIVATED) .build(), - resolvedSchema); + resolvedSchema, + refreshMode, + freshness); } } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java index 9b8ab94a421f0..2ef20f7f80cde 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java @@ -197,8 +197,8 @@ void testCatalogMaterializedTableResolution() { assertThat(resolvedMaterializedTable.getDefinitionQuery()) .isEqualTo(materializedTable.getDefinitionQuery()); - assertThat(resolvedMaterializedTable.getFreshness()) - .isEqualTo(materializedTable.getFreshness()); + assertThat(resolvedMaterializedTable.getDefinitionFreshness()) + .isEqualTo(materializedTable.getDefinitionFreshness()); assertThat(resolvedMaterializedTable.getLogicalRefreshMode()) .isEqualTo(materializedTable.getLogicalRefreshMode()); assertThat(resolvedMaterializedTable.getRefreshMode()) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java index a2bbeba1c59e3..f0691d6252c4f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java @@ -30,8 +30,6 @@ import java.util.Map; import java.util.Optional; -import static org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToDuration; - /** * Represents the unresolved metadata of a materialized table in a {@link Catalog}. * @@ -126,20 +124,27 @@ CatalogMaterializedTable copy( * Get the definition freshness of materialized table which is used to determine the physical * refresh mode. */ + @Nullable IntervalFreshness getDefinitionFreshness(); /** * Get the {@link Duration} value of materialized table definition freshness, it is converted * from {@link IntervalFreshness}. + * + * @deprecated use {@link #getDefinitionFreshness()} together with {@link + * IntervalFreshness#toDuration()} instead. */ - default Duration getFreshness() { - return convertFreshnessToDuration(getDefinitionFreshness()); + @Deprecated + default @Nullable Duration getFreshness() { + final IntervalFreshness definitionFreshness = getDefinitionFreshness(); + return definitionFreshness == null ? null : definitionFreshness.toDuration(); } /** Get the logical refresh mode of materialized table. */ LogicalRefreshMode getLogicalRefreshMode(); /** Get the physical refresh mode of materialized table. */ + @Nullable RefreshMode getRefreshMode(); /** Get the refresh status of materialized table. */ @@ -205,9 +210,9 @@ class Builder { private Map options = Collections.emptyMap(); private @Nullable Long snapshot; private String definitionQuery; - private IntervalFreshness freshness; + private @Nullable IntervalFreshness freshness; private LogicalRefreshMode logicalRefreshMode; - private RefreshMode refreshMode; + private @Nullable RefreshMode refreshMode; private RefreshStatus refreshStatus; private @Nullable String refreshHandlerDescription; private @Nullable byte[] serializedRefreshHandler; @@ -247,8 +252,8 @@ public Builder definitionQuery(String definitionQuery) { return this; } - public Builder freshness(IntervalFreshness freshness) { - this.freshness = Preconditions.checkNotNull(freshness, "Freshness must not be null."); + public Builder freshness(@Nullable IntervalFreshness freshness) { + this.freshness = freshness; return this; } @@ -259,9 +264,8 @@ public Builder logicalRefreshMode(LogicalRefreshMode logicalRefreshMode) { return this; } - public Builder refreshMode(RefreshMode refreshMode) { - this.refreshMode = - Preconditions.checkNotNull(refreshMode, "Refresh mode must not be null."); + public Builder refreshMode(@Nullable RefreshMode refreshMode) { + this.refreshMode = refreshMode; return this; } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java index 9378e862ab273..265e16fde931d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java @@ -60,9 +60,9 @@ protected DefaultCatalogMaterializedTable( Map options, @Nullable Long snapshot, String definitionQuery, - IntervalFreshness freshness, + @Nullable IntervalFreshness freshness, LogicalRefreshMode logicalRefreshMode, - RefreshMode refreshMode, + @Nullable RefreshMode refreshMode, RefreshStatus refreshStatus, @Nullable String refreshHandlerDescription, @Nullable byte[] serializedRefreshHandler) { @@ -73,10 +73,10 @@ protected DefaultCatalogMaterializedTable( this.options = checkNotNull(options, "Options must not be null."); this.snapshot = snapshot; this.definitionQuery = checkNotNull(definitionQuery, "Definition query must not be null."); - this.freshness = checkNotNull(freshness, "Freshness must not be null."); + this.freshness = freshness; this.logicalRefreshMode = checkNotNull(logicalRefreshMode, "Logical refresh mode must not be null."); - this.refreshMode = checkNotNull(refreshMode, "Refresh mode must not be null."); + this.refreshMode = refreshMode; this.refreshStatus = checkNotNull(refreshStatus, "Refresh status must not be null."); this.refreshHandlerDescription = refreshHandlerDescription; this.serializedRefreshHandler = serializedRefreshHandler; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultMaterializedTableEnricher.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultMaterializedTableEnricher.java new file mode 100644 index 0000000000000..c9858dab7d09e --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultMaterializedTableEnricher.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode; +import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; + +import java.time.Duration; + +/** + * Default implementation of {@link MaterializedTableEnricher}. + * + *

Applies default freshness values based on refresh mode and determines the physical refresh + * mode using freshness threshold comparison. + */ +@Internal +public class DefaultMaterializedTableEnricher implements MaterializedTableEnricher { + + private final IntervalFreshness defaultContinuousFreshness; + private final IntervalFreshness defaultFullFreshness; + private final Duration freshnessThreshold; + + public static DefaultMaterializedTableEnricher create( + final Duration defaultContinuousFreshness, + final Duration defaultFullFreshness, + final Duration freshnessThreshold) { + final IntervalFreshness continuousFreshness = + IntervalFreshness.fromDuration(defaultContinuousFreshness); + final IntervalFreshness fullFreshness = + IntervalFreshness.fromDuration(defaultFullFreshness); + return new DefaultMaterializedTableEnricher( + continuousFreshness, fullFreshness, freshnessThreshold); + } + + private DefaultMaterializedTableEnricher( + final IntervalFreshness defaultContinuousFreshness, + final IntervalFreshness defaultFullFreshness, + final Duration freshnessThreshold) { + this.defaultContinuousFreshness = defaultContinuousFreshness; + this.defaultFullFreshness = defaultFullFreshness; + this.freshnessThreshold = freshnessThreshold; + } + + @Override + public MaterializedTableEnrichmentResult enrich(final CatalogMaterializedTable table) { + // Determine the final freshness value + final IntervalFreshness finalFreshness = deriveFreshness(table); + + // Derive the final refresh mode using the freshness and threshold + final RefreshMode finalRefreshMode = + deriveRefreshMode( + table.getLogicalRefreshMode(), finalFreshness, freshnessThreshold); + + return new MaterializedTableEnrichmentResult(finalFreshness, finalRefreshMode); + } + + /** + * Returns user-specified freshness or applies mode-specific defaults: FULL mode uses {@code + * defaultFullFreshness}, others use {@code defaultContinuousFreshness}. + */ + private IntervalFreshness deriveFreshness(final CatalogMaterializedTable table) { + final IntervalFreshness finalFreshness; + if (table.getDefinitionFreshness() != null) { + // User provided an explicit freshness, use it + finalFreshness = table.getDefinitionFreshness(); + } else { + // User omitted freshness, choose default based on logical mode + if (table.getLogicalRefreshMode() == LogicalRefreshMode.FULL) { + finalFreshness = defaultFullFreshness; + } else { + // For AUTOMATIC or CONTINUOUS modes, use the continuous default + finalFreshness = defaultContinuousFreshness; + } + } + return finalFreshness; + } + + /** + * Determines physical refresh mode: CONTINUOUS if freshness is below threshold or explicitly + * requested, otherwise FULL (validated for cron conversion). + */ + public RefreshMode deriveRefreshMode( + LogicalRefreshMode logicalRefreshMode, + IntervalFreshness freshness, + Duration threshold) { + if (logicalRefreshMode != LogicalRefreshMode.FULL) { + final Duration definedFreshness = freshness.toDuration(); + if (logicalRefreshMode == LogicalRefreshMode.CONTINUOUS + || definedFreshness.compareTo(threshold) < 0) { + return RefreshMode.CONTINUOUS; + } + } + + // Validate that freshness can be converted to cron for FULL mode + IntervalFreshness.validateFreshnessForCron(freshness); + return RefreshMode.FULL; + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java index fdbb8e6139fa0..6548b14966390 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/IntervalFreshness.java @@ -19,7 +19,10 @@ package org.apache.flink.table.catalog; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.ValidationException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Objects; /** @@ -31,35 +34,186 @@ @PublicEvolving public class IntervalFreshness { - private final String interval; + private static final String SECOND_CRON_EXPRESSION_TEMPLATE = "0/%s * * * * ? *"; + private static final String MINUTE_CRON_EXPRESSION_TEMPLATE = "0 0/%s * * * ? *"; + private static final String HOUR_CRON_EXPRESSION_TEMPLATE = "0 0 0/%s * * ? *"; + private static final String ONE_DAY_CRON_EXPRESSION_TEMPLATE = "0 0 0 * * ? *"; + private static final long SECOND_CRON_UPPER_BOUND = 60; + private static final long MINUTE_CRON_UPPER_BOUND = 60; + private static final long HOUR_CRON_UPPER_BOUND = 24; + + private final int interval; private final TimeUnit timeUnit; - private IntervalFreshness(String interval, TimeUnit timeUnit) { + private IntervalFreshness(int interval, TimeUnit timeUnit) { this.interval = interval; this.timeUnit = timeUnit; } public static IntervalFreshness of(String interval, TimeUnit timeUnit) { - return new IntervalFreshness(interval, timeUnit); + final int validateIntervalInput = validateIntervalInput(interval); + return new IntervalFreshness(validateIntervalInput, timeUnit); + } + + private static int validateIntervalInput(final String interval) { + final int parsedInt; + try { + parsedInt = Integer.parseInt(interval); + } catch (Exception e) { + final String errorMessage = + String.format( + "The freshness interval currently only supports positive integer type values. But was: %s", + interval); + throw new ValidationException(errorMessage, e); + } + + if (parsedInt <= 0) { + final String errorMessage = + String.format( + "The freshness interval currently only supports positive integer type values. But was: %s", + interval); + throw new ValidationException(errorMessage); + } + return parsedInt; } public static IntervalFreshness ofSecond(String interval) { - return new IntervalFreshness(interval, TimeUnit.SECOND); + return IntervalFreshness.of(interval, TimeUnit.SECOND); } public static IntervalFreshness ofMinute(String interval) { - return new IntervalFreshness(interval, TimeUnit.MINUTE); + return IntervalFreshness.of(interval, TimeUnit.MINUTE); } public static IntervalFreshness ofHour(String interval) { - return new IntervalFreshness(interval, TimeUnit.HOUR); + return IntervalFreshness.of(interval, TimeUnit.HOUR); } public static IntervalFreshness ofDay(String interval) { - return new IntervalFreshness(interval, TimeUnit.DAY); + return IntervalFreshness.of(interval, TimeUnit.DAY); + } + + /** + * Validates that the given freshness can be converted to a cron expression in full refresh + * mode. Since freshness and cron expression cannot be converted equivalently, there are + * currently only a limited patterns of freshness that are supported. + * + * @param intervalFreshness the freshness to validate + * @throws ValidationException if the freshness cannot be converted to a valid cron expression + */ + public static void validateFreshnessForCron(IntervalFreshness intervalFreshness) { + switch (intervalFreshness.getTimeUnit()) { + case SECOND: + validateCronConstraints(intervalFreshness, SECOND_CRON_UPPER_BOUND); + break; + case MINUTE: + validateCronConstraints(intervalFreshness, MINUTE_CRON_UPPER_BOUND); + break; + case HOUR: + validateCronConstraints(intervalFreshness, HOUR_CRON_UPPER_BOUND); + break; + case DAY: + validateDayConstraints(intervalFreshness); + break; + default: + throw new ValidationException( + String.format( + "Unknown freshness time unit: %s.", + intervalFreshness.getTimeUnit())); + } + } + + /** + * Converts the freshness of materialized table to cron expression in full refresh mode. The + * freshness must first pass validation via {@link #validateFreshnessForCron}. + * + * @param intervalFreshness the freshness to convert + * @return the corresponding cron expression + * @throws ValidationException if the freshness cannot be converted to a valid cron expression + */ + public static String convertFreshnessToCron(IntervalFreshness intervalFreshness) { + // First validate that conversion is possible + validateFreshnessForCron(intervalFreshness); + + // Then perform the conversion + switch (intervalFreshness.getTimeUnit()) { + case SECOND: + return String.format( + SECOND_CRON_EXPRESSION_TEMPLATE, intervalFreshness.getIntervalInt()); + case MINUTE: + return String.format( + MINUTE_CRON_EXPRESSION_TEMPLATE, intervalFreshness.getIntervalInt()); + case HOUR: + return String.format( + HOUR_CRON_EXPRESSION_TEMPLATE, intervalFreshness.getIntervalInt()); + case DAY: + return ONE_DAY_CRON_EXPRESSION_TEMPLATE; + default: + throw new ValidationException( + String.format( + "Unknown freshness time unit: %s.", + intervalFreshness.getTimeUnit())); + } } + private static void validateCronConstraints( + IntervalFreshness intervalFreshness, long cronUpperBound) { + int interval = intervalFreshness.getIntervalInt(); + TimeUnit timeUnit = intervalFreshness.getTimeUnit(); + // Freshness must be less than cronUpperBound for corresponding time unit when convert it + // to cron expression + if (interval >= cronUpperBound) { + throw new ValidationException( + String.format( + "In full refresh mode, freshness must be less than %s when the time unit is %s.", + cronUpperBound, timeUnit)); + } + // Freshness must be factors of cronUpperBound for corresponding time unit + if (cronUpperBound % interval != 0) { + throw new ValidationException( + String.format( + "In full refresh mode, only freshness that are factors of %s are currently supported when the time unit is %s.", + cronUpperBound, timeUnit)); + } + } + + private static void validateDayConstraints(IntervalFreshness intervalFreshness) { + // Since the number of days in each month is different, only one day of freshness is + // currently supported when the time unit is DAY + int interval = intervalFreshness.getIntervalInt(); + if (interval > 1) { + throw new ValidationException( + "In full refresh mode, freshness must be 1 when the time unit is DAY."); + } + } + + /** + * Creates an IntervalFreshness from a Duration, choosing the most appropriate time unit. + * Prefers larger units when possible (e.g., 60 seconds → 1 minute). + */ + public static IntervalFreshness fromDuration(Duration duration) { + if (duration.equals(duration.truncatedTo(ChronoUnit.DAYS))) { + return new IntervalFreshness((int) duration.toDays(), TimeUnit.DAY); + } + if (duration.equals(duration.truncatedTo(ChronoUnit.HOURS))) { + return new IntervalFreshness((int) duration.toHours(), TimeUnit.HOUR); + } + if (duration.equals(duration.truncatedTo(ChronoUnit.MINUTES))) { + return new IntervalFreshness((int) duration.toMinutes(), TimeUnit.MINUTE); + } + + return new IntervalFreshness((int) duration.getSeconds(), TimeUnit.SECOND); + } + + /** + * @deprecated Use {@link #getIntervalInt()} instead. + */ + @Deprecated public String getInterval() { + return String.valueOf(interval); + } + + public int getIntervalInt() { return interval; } @@ -67,6 +221,21 @@ public TimeUnit getTimeUnit() { return timeUnit; } + public Duration toDuration() { + switch (timeUnit) { + case SECOND: + return Duration.ofSeconds(interval); + case MINUTE: + return Duration.ofMinutes(interval); + case HOUR: + return Duration.ofHours(interval); + case DAY: + return Duration.ofDays(interval); + default: + throw new IllegalStateException("Unexpected value: " + timeUnit); + } + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnricher.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnricher.java new file mode 100644 index 0000000000000..33cb9adf7b6ca --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnricher.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Experimental; + +/** + * Enricher interface for determining materialized table properties during catalog resolution. + * + *

This enricher resolves: + * + *

    + *
  • Freshness intervals when not explicitly specified by the user + *
  • Physical refresh modes (CONTINUOUS or FULL) based on logical preferences and configuration + *
+ * + *

Implementations can provide custom strategies tailored to different deployment environments or + * operational requirements. + */ +@Experimental +public interface MaterializedTableEnricher { + + /** + * Enriches a materialized table by determining its final freshness interval and refresh mode. + * + * @param catalogMaterializedTable the materialized table to enrich, which may have null + * freshness + * @return the enrichment result with resolved, non-null freshness and refresh mode + */ + MaterializedTableEnrichmentResult enrich(CatalogMaterializedTable catalogMaterializedTable); +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnrichmentResult.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnrichmentResult.java new file mode 100644 index 0000000000000..dc19d82ea2722 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/MaterializedTableEnrichmentResult.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; + +/** + * Result of the enrichment process containing the resolved freshness interval and physical refresh + * mode for a {@link CatalogMaterializedTable}. + * + *

This object is returned by {@link MaterializedTableEnricher} after determining the final, + * non-null values for both properties. + */ +@Experimental +public class MaterializedTableEnrichmentResult { + + private final IntervalFreshness freshness; + private final RefreshMode refreshMode; + + public MaterializedTableEnrichmentResult( + final IntervalFreshness freshness, final RefreshMode refreshMode) { + this.freshness = freshness; + this.refreshMode = refreshMode; + } + + public IntervalFreshness getFreshness() { + return freshness; + } + + public RefreshMode getRefreshMode() { + return refreshMode; + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java index 82ad39d0fc3c0..6f0bf960b181f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java @@ -19,8 +19,8 @@ package org.apache.flink.table.catalog; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.List; @@ -28,6 +28,8 @@ import java.util.Objects; import java.util.Optional; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * A validated {@link CatalogMaterializedTable} that is backed by the original metadata coming from * the {@link Catalog} but resolved by the framework. @@ -43,13 +45,19 @@ public class ResolvedCatalogMaterializedTable private final ResolvedSchema resolvedSchema; + private final RefreshMode refreshMode; + + private final IntervalFreshness freshness; + public ResolvedCatalogMaterializedTable( - CatalogMaterializedTable origin, ResolvedSchema resolvedSchema) { - this.origin = - Preconditions.checkNotNull( - origin, "Original catalog materialized table must not be null."); - this.resolvedSchema = - Preconditions.checkNotNull(resolvedSchema, "Resolved schema must not be null."); + CatalogMaterializedTable origin, + ResolvedSchema resolvedSchema, + RefreshMode refreshMode, + IntervalFreshness freshness) { + this.origin = checkNotNull(origin, "Original catalog materialized table must not be null."); + this.resolvedSchema = checkNotNull(resolvedSchema, "Resolved schema must not be null."); + this.refreshMode = checkNotNull(refreshMode, "Refresh mode must not be null."); + this.freshness = checkNotNull(freshness, "Freshness must not be null."); } @Override @@ -65,12 +73,13 @@ public String getComment() { @Override public CatalogBaseTable copy() { return new ResolvedCatalogMaterializedTable( - (CatalogMaterializedTable) origin.copy(), resolvedSchema); + (CatalogMaterializedTable) origin.copy(), resolvedSchema, refreshMode, freshness); } @Override public ResolvedCatalogMaterializedTable copy(Map options) { - return new ResolvedCatalogMaterializedTable(origin.copy(options), resolvedSchema); + return new ResolvedCatalogMaterializedTable( + origin.copy(options), resolvedSchema, refreshMode, freshness); } @Override @@ -80,7 +89,9 @@ public ResolvedCatalogMaterializedTable copy( byte[] serializedRefreshHandler) { return new ResolvedCatalogMaterializedTable( origin.copy(refreshStatus, refreshHandlerDescription, serializedRefreshHandler), - resolvedSchema); + resolvedSchema, + refreshMode, + freshness); } @Override @@ -124,8 +135,8 @@ public String getDefinitionQuery() { } @Override - public IntervalFreshness getDefinitionFreshness() { - return origin.getDefinitionFreshness(); + public @Nonnull IntervalFreshness getDefinitionFreshness() { + return freshness; } @Override @@ -134,8 +145,8 @@ public LogicalRefreshMode getLogicalRefreshMode() { } @Override - public RefreshMode getRefreshMode() { - return origin.getRefreshMode(); + public @Nonnull RefreshMode getRefreshMode() { + return refreshMode; } @Override diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/IntervalFreshnessUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/IntervalFreshnessUtils.java deleted file mode 100644 index cd58bff4d9102..0000000000000 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/IntervalFreshnessUtils.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.utils; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.IntervalFreshness; - -import org.apache.commons.lang3.math.NumberUtils; - -import java.time.Duration; - -/** Utilities to {@link IntervalFreshness}. */ -@Internal -public class IntervalFreshnessUtils { - - private static final String SECOND_CRON_EXPRESSION_TEMPLATE = "0/%s * * * * ? *"; - private static final String MINUTE_CRON_EXPRESSION_TEMPLATE = "0 0/%s * * * ? *"; - private static final String HOUR_CRON_EXPRESSION_TEMPLATE = "0 0 0/%s * * ? *"; - private static final String ONE_DAY_CRON_EXPRESSION_TEMPLATE = "0 0 0 * * ? *"; - - private static final long SECOND_CRON_UPPER_BOUND = 60; - private static final long MINUTE_CRON_UPPER_BOUND = 60; - private static final long HOUR_CRON_UPPER_BOUND = 24; - - private IntervalFreshnessUtils() {} - - @VisibleForTesting - static void validateIntervalFreshness(IntervalFreshness intervalFreshness) { - if (!NumberUtils.isParsable(intervalFreshness.getInterval())) { - throw new ValidationException( - String.format( - "The interval freshness value '%s' is an illegal integer type value.", - intervalFreshness.getInterval())); - } - - if (!NumberUtils.isDigits(intervalFreshness.getInterval())) { - throw new ValidationException( - "The freshness interval currently only supports integer type values."); - } - } - - public static Duration convertFreshnessToDuration(IntervalFreshness intervalFreshness) { - // validate the freshness value firstly - validateIntervalFreshness(intervalFreshness); - - long interval = Long.parseLong(intervalFreshness.getInterval()); - switch (intervalFreshness.getTimeUnit()) { - case DAY: - return Duration.ofDays(interval); - case HOUR: - return Duration.ofHours(interval); - case MINUTE: - return Duration.ofMinutes(interval); - case SECOND: - return Duration.ofSeconds(interval); - default: - throw new ValidationException( - String.format( - "Unknown freshness time unit: %s.", - intervalFreshness.getTimeUnit())); - } - } - - /** - * This is an util method that is used to convert the freshness of materialized table to cron - * expression in full refresh mode. Since freshness and cron expression cannot be converted - * equivalently, there are currently only a limited patterns of freshness that can be converted - * to cron expression. - */ - public static String convertFreshnessToCron(IntervalFreshness intervalFreshness) { - switch (intervalFreshness.getTimeUnit()) { - case SECOND: - return validateAndConvertCron( - intervalFreshness, - SECOND_CRON_UPPER_BOUND, - SECOND_CRON_EXPRESSION_TEMPLATE); - case MINUTE: - return validateAndConvertCron( - intervalFreshness, - MINUTE_CRON_UPPER_BOUND, - MINUTE_CRON_EXPRESSION_TEMPLATE); - case HOUR: - return validateAndConvertCron( - intervalFreshness, HOUR_CRON_UPPER_BOUND, HOUR_CRON_EXPRESSION_TEMPLATE); - case DAY: - return validateAndConvertDayCron(intervalFreshness); - default: - throw new ValidationException( - String.format( - "Unknown freshness time unit: %s.", - intervalFreshness.getTimeUnit())); - } - } - - private static String validateAndConvertCron( - IntervalFreshness intervalFreshness, long cronUpperBound, String cronTemplate) { - long interval = Long.parseLong(intervalFreshness.getInterval()); - IntervalFreshness.TimeUnit timeUnit = intervalFreshness.getTimeUnit(); - // Freshness must be less than cronUpperBound for corresponding time unit when convert it - // to cron expression - if (interval >= cronUpperBound) { - throw new ValidationException( - String.format( - "In full refresh mode, freshness must be less than %s when the time unit is %s.", - cronUpperBound, timeUnit)); - } - // Freshness must be factors of cronUpperBound for corresponding time unit - if (cronUpperBound % interval != 0) { - throw new ValidationException( - String.format( - "In full refresh mode, only freshness that are factors of %s are currently supported when the time unit is %s.", - cronUpperBound, timeUnit)); - } - - return String.format(cronTemplate, interval); - } - - private static String validateAndConvertDayCron(IntervalFreshness intervalFreshness) { - // Since the number of days in each month is different, only one day of freshness is - // currently supported when the time unit is DAY - long interval = Long.parseLong(intervalFreshness.getInterval()); - if (interval > 1) { - throw new ValidationException( - "In full refresh mode, freshness must be 1 when the time unit is DAY."); - } - return ONE_DAY_CRON_EXPRESSION_TEMPLATE; - } -} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java index bbe708e650940..adb8a845421ea 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java @@ -20,6 +20,7 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; import org.apache.flink.table.expressions.DefaultSqlFactory; import org.junit.jupiter.api.Test; @@ -165,6 +166,8 @@ private static Stream> resolvedCatalogBaseTables() { .refreshStatus(CatalogMaterializedTable.RefreshStatus.ACTIVATED) .refreshHandlerDescription("description") .build(), - resolvedSchema)); + resolvedSchema, + RefreshMode.CONTINUOUS, + IntervalFreshness.ofHour("123"))); } } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/IntervalFreshnessUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/IntervalFreshnessTest.java similarity index 74% rename from flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/IntervalFreshnessUtilsTest.java rename to flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/IntervalFreshnessTest.java index 86e660d50b133..de794684956c2 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/IntervalFreshnessUtilsTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/IntervalFreshnessTest.java @@ -16,53 +16,69 @@ * limitations under the License. */ -package org.apache.flink.table.utils; +package org.apache.flink.table.catalog; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.IntervalFreshness; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; -import static org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToCron; -import static org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToDuration; -import static org.apache.flink.table.utils.IntervalFreshnessUtils.validateIntervalFreshness; +import static org.apache.flink.table.catalog.IntervalFreshness.convertFreshnessToCron; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Tests for {@link IntervalFreshnessUtils}. */ -public class IntervalFreshnessUtilsTest { +/** Tests for {@link IntervalFreshness}. */ +public class IntervalFreshnessTest { - @Test - void testIllegalIntervalFreshness() { - assertThatThrownBy(() -> validateIntervalFreshness(IntervalFreshness.ofMinute("2efedd"))) + @ParameterizedTest + @ValueSource(strings = {"2efedd", "2.5", "-2", "0", "12345678901234567890"}) + void testIllegalIntervalFreshness(String invalidInput) { + assertThatThrownBy(() -> IntervalFreshness.ofMinute(invalidInput)) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "The interval freshness value '2efedd' is an illegal integer type value."); + String.format( + "The freshness interval currently only supports positive integer type values. But was: %s", + invalidInput)); + } - assertThatThrownBy(() -> validateIntervalFreshness(IntervalFreshness.ofMinute("2.5"))) - .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "The freshness interval currently only supports integer type values."); + @Test + void testConvertDurationToFreshnessInterval() { + // verify second + IntervalFreshness actualSeconds = IntervalFreshness.fromDuration(Duration.ofSeconds(20)); + assertThat(actualSeconds).isEqualTo(IntervalFreshness.ofSecond("20")); + + // verify minute + IntervalFreshness actualMinutes = IntervalFreshness.fromDuration(Duration.ofMinutes(3)); + assertThat(actualMinutes).isEqualTo(IntervalFreshness.ofMinute("3")); + + // verify hour + IntervalFreshness actualHour = IntervalFreshness.fromDuration(Duration.ofHours(1)); + assertThat(actualHour).isEqualTo(IntervalFreshness.ofHour("1")); + + // verify day + IntervalFreshness actualDay = IntervalFreshness.fromDuration(Duration.ofDays(2)); + assertThat(actualDay).isEqualTo(IntervalFreshness.ofDay("2")); } @Test void testConvertFreshnessToDuration() { // verify second - Duration actualSecond = convertFreshnessToDuration(IntervalFreshness.ofSecond("20")); + Duration actualSecond = IntervalFreshness.ofSecond("20").toDuration(); assertThat(actualSecond).isEqualTo(Duration.ofSeconds(20)); // verify minute - Duration actualMinute = convertFreshnessToDuration(IntervalFreshness.ofMinute("3")); + Duration actualMinute = IntervalFreshness.ofMinute("3").toDuration(); assertThat(actualMinute).isEqualTo(Duration.ofMinutes(3)); // verify hour - Duration actualHour = convertFreshnessToDuration(IntervalFreshness.ofHour("3")); + Duration actualHour = IntervalFreshness.ofHour("3").toDuration(); assertThat(actualHour).isEqualTo(Duration.ofHours(3)); // verify day - Duration actualDay = convertFreshnessToDuration(IntervalFreshness.ofDay("3")); + Duration actualDay = IntervalFreshness.ofDay("3").toDuration(); assertThat(actualDay).isEqualTo(Duration.ofDays(3)); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java index 46c6dd44fa7b3..2107b209b27c1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java @@ -26,6 +26,8 @@ import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode; +import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -50,10 +52,8 @@ import java.util.stream.Collectors; import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.DATE_FORMATTER; -import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.MATERIALIZED_TABLE_FRESHNESS_THRESHOLD; import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.PARTITION_FIELDS; -import static org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToCron; -import static org.apache.flink.table.utils.IntervalFreshnessUtils.convertFreshnessToDuration; +import static org.apache.flink.table.catalog.IntervalFreshness.validateFreshnessForCron; /** A converter for {@link SqlCreateMaterializedTable}. */ public class SqlCreateMaterializedTableConverter @@ -77,33 +77,27 @@ public Operation convertSqlNode( // get freshness IntervalFreshness intervalFreshness = - MaterializedTableUtils.getMaterializedTableFreshness( - sqlCreateMaterializedTable.getFreshness()); + Optional.ofNullable(sqlCreateMaterializedTable.getFreshness()) + .map(MaterializedTableUtils::getMaterializedTableFreshness) + .orElse(null); - // get refresh mode - SqlRefreshMode sqlRefreshMode = null; - if (sqlCreateMaterializedTable.getRefreshMode().isPresent()) { - sqlRefreshMode = - sqlCreateMaterializedTable - .getRefreshMode() - .get() - .getValueAs(SqlRefreshMode.class); - } - CatalogMaterializedTable.LogicalRefreshMode logicalRefreshMode = + // Get the logical refresh mode from SQL + SqlRefreshMode sqlRefreshMode = + Optional.ofNullable(sqlCreateMaterializedTable.getRefreshMode()) + .map(mode -> mode.getValueAs(SqlRefreshMode.class)) + .orElse(null); + + final LogicalRefreshMode logicalRefreshMode = MaterializedTableUtils.deriveLogicalRefreshMode(sqlRefreshMode); - // only MATERIALIZED_TABLE_FRESHNESS_THRESHOLD configured in flink conf yaml work, so we get - // it from rootConfiguration instead of table config - CatalogMaterializedTable.RefreshMode refreshMode = - MaterializedTableUtils.deriveRefreshMode( - context.getTableConfig() - .getRootConfiguration() - .get(MATERIALIZED_TABLE_FRESHNESS_THRESHOLD), - convertFreshnessToDuration(intervalFreshness), - logicalRefreshMode); - // If the refresh mode is full, validate whether the freshness can convert to cron - // expression in advance - if (CatalogMaterializedTable.RefreshMode.FULL == refreshMode) { - convertFreshnessToCron(intervalFreshness); + + // get the physical refresh mode from SQL + final RefreshMode refreshMode = + sqlRefreshMode == null + ? null + : MaterializedTableUtils.fromSqltoRefreshMode(sqlRefreshMode); + + if (CatalogMaterializedTable.RefreshMode.FULL == refreshMode && intervalFreshness != null) { + validateFreshnessForCron(intervalFreshness); } // get query schema and definition query diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java index ac41bfa58c1ff..ba765e40a698a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java @@ -22,13 +22,12 @@ import org.apache.flink.sql.parser.ddl.SqlRefreshMode; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.calcite.sql.SqlIntervalLiteral; import org.apache.calcite.sql.type.SqlTypeFamily; -import java.time.Duration; - /** The utils for materialized table. */ @Internal public class MaterializedTableUtils { @@ -79,22 +78,14 @@ public static CatalogMaterializedTable.LogicalRefreshMode deriveLogicalRefreshMo } } - public static CatalogMaterializedTable.RefreshMode deriveRefreshMode( - Duration threshold, - Duration definedFreshness, - CatalogMaterializedTable.LogicalRefreshMode definedRefreshMode) { - // If the refresh mode is specified manually, use it directly. - if (definedRefreshMode == CatalogMaterializedTable.LogicalRefreshMode.FULL) { - return CatalogMaterializedTable.RefreshMode.FULL; - } else if (definedRefreshMode == CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS) { - return CatalogMaterializedTable.RefreshMode.CONTINUOUS; - } - - // derive the actual refresh mode via defined freshness - if (definedFreshness.compareTo(threshold) < 0) { - return CatalogMaterializedTable.RefreshMode.CONTINUOUS; - } else { - return CatalogMaterializedTable.RefreshMode.FULL; + public static RefreshMode fromSqltoRefreshMode(SqlRefreshMode sqlRefreshMode) { + switch (sqlRefreshMode) { + case FULL: + return RefreshMode.FULL; + case CONTINUOUS: + return RefreshMode.CONTINUOUS; + default: + throw new IllegalArgumentException("Unknown refresh mode: " + sqlRefreshMode); } } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java index cdf6fbe417475..13835f16d615c 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java @@ -85,6 +85,9 @@ import org.assertj.core.api.HamcrestCondition; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import javax.annotation.Nullable; @@ -96,7 +99,9 @@ import java.util.Optional; import java.util.Set; import java.util.TreeMap; +import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.planner.utils.OperationMatchers.entry; @@ -1404,53 +1409,93 @@ void testFailedToAlterTableDropDistribution() throws Exception { checkAlterNonExistTable("alter table %s nonexistent drop watermark"); } - @Test - void createMaterializedTableWithDistribution() throws Exception { - final String sql = - "CREATE MATERIALIZED TABLE users_shops (" - + " PRIMARY KEY (user_id) not enforced)" - + " DISTRIBUTED BY HASH (user_id) INTO 7 BUCKETS\n" - + " WITH(\n" - + " 'format' = 'debezium-json'\n" - + " )\n" - + " FRESHNESS = INTERVAL '30' SECOND\n" - + " AS SELECT 1 as shop_id, 2 as user_id "; - - final String expectedSummaryString = - "CREATE MATERIALIZED TABLE: (materializedTable: " - + "[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n" - + " `shop_id` INT NOT NULL,\n" - + " `user_id` INT NOT NULL,\n" - + " CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n" - + "), comment='null', distribution=DISTRIBUTED BY HASH(`user_id`) INTO 7 BUCKETS, partitionKeys=[], " - + "options={format=debezium-json}, snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', " - + "freshness=INTERVAL '30' SECOND, logicalRefreshMode=AUTOMATIC, refreshMode=CONTINUOUS, " - + "refreshStatus=INITIALIZING, refreshHandlerDescription='null', serializedRefreshHandler=null}, resolvedSchema=(\n" - + " `shop_id` INT NOT NULL,\n" - + " `user_id` INT NOT NULL,\n" - + " CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n" - + ")}], identifier: [`builtin`.`default`.`users_shops`])"; + @ParameterizedTest(name = "[{index}] {0}") + @MethodSource("provideCreateMaterializedTableTestCases") + void createMaterializedTableWithVariousOptions( + String testName, + String sql, + String expectedSummaryString, + Consumer additionalAssertions) { final Operation operation = parse(sql); assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class); assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString); - assertThat( - ((CreateMaterializedTableOperation) operation) - .getCatalogMaterializedTable() - .getDistribution() - .get()) - .isEqualTo(TableDistribution.of(Kind.HASH, 7, List.of("user_id"))); - - prepareMaterializedTable("tb2", false, 1, null, "SELECT 1"); - assertThatThrownBy( - () -> - parse( - "alter MATERIALIZED table cat1.db1.tb2 modify distribution into 3 buckets")) - .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "Materialized table `cat1`.`db1`.`tb2` does not have a distribution to modify."); + final CreateMaterializedTableOperation createMaterializedTableOperation = + (CreateMaterializedTableOperation) operation; + + additionalAssertions.accept(createMaterializedTableOperation); + } + + private static Stream provideCreateMaterializedTableTestCases() { + return Stream.of( + Arguments.of( + "with refresh mode continuous", + "CREATE MATERIALIZED TABLE users_shops (" + + " PRIMARY KEY (user_id) not enforced)" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " REFRESH_MODE = CONTINUOUS\n" + + " AS SELECT 1 as shop_id, 2 as user_id ", + "CREATE MATERIALIZED TABLE: (materializedTable: " + + "[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n" + + " `shop_id` INT NOT NULL,\n" + + " `user_id` INT NOT NULL,\n" + + " CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n" + + "), comment='null', distribution=null, partitionKeys=[], " + + "options={format=debezium-json}, snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', " + + "freshness=INTERVAL '30' SECOND, logicalRefreshMode=CONTINUOUS, refreshMode=CONTINUOUS, " + + "refreshStatus=INITIALIZING, refreshHandlerDescription='null', serializedRefreshHandler=null}, resolvedSchema=(\n" + + " `shop_id` INT NOT NULL,\n" + + " `user_id` INT NOT NULL,\n" + + " CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n" + + ")}], identifier: [`builtin`.`default`.`users_shops`])", + (Consumer) + op -> { + assertThat( + op.getCatalogMaterializedTable() + .getDefinitionFreshness()) + .isEqualTo(IntervalFreshness.ofSecond("30")); + assertThat(op.getCatalogMaterializedTable().getRefreshMode()) + .isSameAs(RefreshMode.CONTINUOUS); + }), + Arguments.of( + "with distribution", + "CREATE MATERIALIZED TABLE users_shops (" + + " PRIMARY KEY (user_id) not enforced)" + + " DISTRIBUTED BY HASH (user_id) INTO 7 BUCKETS\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " AS SELECT 1 as shop_id, 2 as user_id ", + "CREATE MATERIALIZED TABLE: (materializedTable: " + + "[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n" + + " `shop_id` INT NOT NULL,\n" + + " `user_id` INT NOT NULL,\n" + + " CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n" + + "), comment='null', distribution=DISTRIBUTED BY HASH(`user_id`) INTO 7 BUCKETS, partitionKeys=[], " + + "options={format=debezium-json}, snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', " + + "freshness=INTERVAL '30' SECOND, logicalRefreshMode=AUTOMATIC, refreshMode=null, " + + "refreshStatus=INITIALIZING, refreshHandlerDescription='null', serializedRefreshHandler=null}, resolvedSchema=(\n" + + " `shop_id` INT NOT NULL,\n" + + " `user_id` INT NOT NULL,\n" + + " CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n" + + ")}], identifier: [`builtin`.`default`.`users_shops`])", + (Consumer) + op -> + assertThat( + op.getCatalogMaterializedTable() + .getDistribution() + .get()) + .isEqualTo( + TableDistribution.of( + Kind.HASH, + 7, + List.of("user_id"))))); } @Test diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java index 39275541d8cfc..67a828d7fe088 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -23,6 +23,8 @@ import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode; +import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.IntervalFreshness; @@ -128,9 +130,7 @@ void testCreateMaterializedTable() { CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation; ResolvedCatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable(); - Map options = new HashMap<>(); - options.put("connector", "filesystem"); - options.put("format", "json"); + Map options = Map.of("connector", "filesystem", "format", "json"); CatalogMaterializedTable expected = CatalogMaterializedTable.newBuilder() .schema( @@ -146,13 +146,120 @@ void testCreateMaterializedTable() { .partitionKeys(Arrays.asList("a", "d")) .freshness(IntervalFreshness.ofSecond("30")) .logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.FULL) - .refreshMode(CatalogMaterializedTable.RefreshMode.FULL) + .refreshMode(RefreshMode.FULL) + .refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING) + .definitionQuery( + "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n" + + "FROM `builtin`.`default`.`t1` AS `t1`") + .build(); + + final IntervalFreshness resolvedFreshness = materializedTable.getDefinitionFreshness(); + assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofSecond("30")); + + final RefreshMode resolvedRefreshMode = materializedTable.getRefreshMode(); + assertThat(resolvedRefreshMode).isSameAs(RefreshMode.FULL); + + assertThat(materializedTable.getOrigin()).isEqualTo(expected); + } + + @Test + void testCreateMaterializedTableWithoutFreshness() { + final String sql = + "CREATE MATERIALIZED TABLE mtbl1 (\n" + + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" + + ")\n" + + "COMMENT 'materialized table comment'\n" + + "PARTITIONED BY (a, d)\n" + + "WITH (\n" + + " 'connector' = 'filesystem', \n" + + " 'format' = 'json'\n" + + ")\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT * FROM t1"; + Operation operation = parse(sql); + assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class); + + CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation; + ResolvedCatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable(); + assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class); + + Map options = Map.of("connector", "filesystem", "format", "json"); + + CatalogMaterializedTable expected = + CatalogMaterializedTable.newBuilder() + .schema( + Schema.newBuilder() + .column("a", DataTypes.BIGINT().notNull()) + .column("b", DataTypes.VARCHAR(Integer.MAX_VALUE)) + .column("c", DataTypes.INT()) + .column("d", DataTypes.VARCHAR(Integer.MAX_VALUE)) + .primaryKeyNamed("ct1", Collections.singletonList("a")) + .build()) + .comment("materialized table comment") + .options(options) + .partitionKeys(Arrays.asList("a", "d")) + .logicalRefreshMode(LogicalRefreshMode.FULL) + .refreshMode(RefreshMode.FULL) + .refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING) + .definitionQuery( + "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n" + + "FROM `builtin`.`default`.`t1` AS `t1`") + .build(); + + // The resolved freshness should default to 1 minute + final IntervalFreshness resolvedFreshness = materializedTable.getDefinitionFreshness(); + assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofHour("1")); + + final RefreshMode resolvedRefreshMode = materializedTable.getRefreshMode(); + assertThat(resolvedRefreshMode).isSameAs(RefreshMode.FULL); + + assertThat(materializedTable.getOrigin()).isEqualTo(expected); + } + + @Test + void testCreateMaterializedTableWithoutFreshnessAndRefreshMode() { + final String sql = + "CREATE MATERIALIZED TABLE mtbl1 (\n" + + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" + + ")\n" + + "COMMENT 'materialized table comment'\n" + + "PARTITIONED BY (a, d)\n" + + "WITH (\n" + + " 'connector' = 'filesystem', \n" + + " 'format' = 'json'\n" + + ")\n" + + "AS SELECT * FROM t1"; + Operation operation = parse(sql); + assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class); + + CreateMaterializedTableOperation op = (CreateMaterializedTableOperation) operation; + ResolvedCatalogMaterializedTable materializedTable = op.getCatalogMaterializedTable(); + assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class); + + Map options = Map.of("connector", "filesystem", "format", "json"); + + CatalogMaterializedTable expected = + CatalogMaterializedTable.newBuilder() + .schema( + Schema.newBuilder() + .column("a", DataTypes.BIGINT().notNull()) + .column("b", DataTypes.VARCHAR(Integer.MAX_VALUE)) + .column("c", DataTypes.INT()) + .column("d", DataTypes.VARCHAR(Integer.MAX_VALUE)) + .primaryKeyNamed("ct1", Collections.singletonList("a")) + .build()) + .comment("materialized table comment") + .options(options) + .partitionKeys(Arrays.asList("a", "d")) + .logicalRefreshMode(LogicalRefreshMode.AUTOMATIC) .refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING) .definitionQuery( "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n" + "FROM `builtin`.`default`.`t1` AS `t1`") .build(); + final IntervalFreshness resolvedFreshness = materializedTable.getDefinitionFreshness(); + assertThat(resolvedFreshness).isEqualTo(IntervalFreshness.ofMinute("3")); assertThat(materializedTable.getOrigin()).isEqualTo(expected); } @@ -206,8 +313,7 @@ void testContinuousRefreshMode() { assertThat(materializedTable.getLogicalRefreshMode()) .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC); - assertThat(materializedTable.getRefreshMode()) - .isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS); + assertThat(materializedTable.getRefreshMode()).isEqualTo(RefreshMode.CONTINUOUS); // test continuous mode by manual specify final String sql2 = @@ -242,8 +348,7 @@ void testFullRefreshMode() { assertThat(materializedTable.getLogicalRefreshMode()) .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC); - assertThat(materializedTable.getRefreshMode()) - .isEqualTo(CatalogMaterializedTable.RefreshMode.FULL); + assertThat(materializedTable.getRefreshMode()).isEqualTo(RefreshMode.FULL); // test full mode by manual specify final String sql2 = @@ -259,17 +364,26 @@ void testFullRefreshMode() { assertThat(materializedTable2.getLogicalRefreshMode()) .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.FULL); - assertThat(materializedTable2.getRefreshMode()) - .isEqualTo(CatalogMaterializedTable.RefreshMode.FULL); - + assertThat(materializedTable2.getRefreshMode()).isEqualTo(RefreshMode.FULL); final String sql3 = "CREATE MATERIALIZED TABLE mtbl1\n" + "FRESHNESS = INTERVAL '40' MINUTE\n" + + "REFRESH_MODE = FULL\n" + "AS SELECT * FROM t1"; assertThatThrownBy(() -> parse(sql3)) .isInstanceOf(ValidationException.class) .hasMessageContaining( "In full refresh mode, only freshness that are factors of 60 are currently supported when the time unit is MINUTE."); + + final String sql4 = + "CREATE MATERIALIZED TABLE mtbl1\n" + + "FRESHNESS = INTERVAL '40' MINUTE\n" + + "AS SELECT * FROM t1"; + + assertThatThrownBy(() -> parse(sql4)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "In full refresh mode, only freshness that are factors of 60 are currently supported when the time unit is MINUTE."); } @Test diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java index 646b2e832c768..5a63cc2a2d805 100644 --- a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java @@ -110,7 +110,9 @@ public class TestFileSystemCatalogTest extends TestFileSystemCatalogTestBase { .refreshMode(CatalogMaterializedTable.RefreshMode.CONTINUOUS) .refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING) .build(), - CREATE_RESOLVED_SCHEMA); + CREATE_RESOLVED_SCHEMA, + CatalogMaterializedTable.RefreshMode.CONTINUOUS, + FRESHNESS); private static final TestRefreshHandler REFRESH_HANDLER = new TestRefreshHandler("jobID: xxx, clusterId: yyy");