Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/content.zh/docs/dev/table/materialized-table/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ under the License.

## 数据新鲜度

数据新鲜度定义了物化表数据相对于基础表更新的最大滞后时间。它并非绝对保证,而是 Flink 尝试达到的目标。框架会尽力确保物化表中的数据在指定的新鲜度内刷新。
数据新鲜度定义了物化表数据相对于基础表更新的最大滞后时间。它并非绝对保证,而是 Flink 尝试达到的目标。框架会尽力确保物化表中的数据在指定的新鲜度目标内刷新。

Data freshness is optional when creating a materialized table. If not specified, the system uses the default freshness based on the refresh mode: [materialized-table.default-freshness.continuous]({{< ref "docs/dev/table/config" >}}#materialized-table-default-freshness-continuous) (default: 3 minutes) for CONTINUOUS mode, or [materialized-table.default-freshness.full]({{< ref "docs/dev/table/config" >}}#materialized-table-default-freshness-full) (default: 1 hour) for FULL mode.

数据新鲜度是物化表的一个关键属性,具有两个主要作用:
- **确定刷新模式**:目前有持续模式和全量模式。关于如何确定刷新模式的详细信息,请参阅 [materialized-table.refresh-mode.freshness-threshold]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold) 配置项。
Expand All @@ -59,4 +61,3 @@ under the License.
## Schema

物化表的 `Schema` 定义与普通表相同,可以声明主键和分区字段。其列名和类型会从相应的查询中推导,用户无法手动指定。

91 changes: 53 additions & 38 deletions docs/content.zh/docs/dev/table/materialized-table/statements.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,21 @@ Flink SQL 目前支持以下物化表操作:

```
CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name

[ ([ <table_constraint> ]) ]

[COMMENT table_comment]

[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]

[WITH (key1=val1, key2=val2, ...)]

FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }

[REFRESH_MODE = { CONTINUOUS | FULL }]

AS <select_statement>

<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
```
Expand All @@ -69,7 +69,7 @@ AS <select_statement>
CREATE MATERIALIZED TABLE my_materialized_table
PARTITIONED BY (ds)
FRESHNESS = INTERVAL '1' HOUR
AS SELECT
AS SELECT
ds
FROM
...
Expand Down Expand Up @@ -103,9 +103,11 @@ CREATE MATERIALIZED TABLE my_materialized_table

`FRESHNESS` 用于指定物化表的数据新鲜度。

`FRESHNESS` is optional. When omitted, the system uses the default freshness based on the refresh mode: `materialized-table.default-freshness.continuous` (default: 3 minutes) for CONTINUOUS mode, or `materialized-table.default-freshness.full` (default: 1 hour) for FULL mode.

**数据新鲜度与刷新模式关系**

数据新鲜度定义了物化表内容滞后于基础表更新的最长时间。它有两个作用,首先通过[配置]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold)确定物化表的[刷新模式]({{< ref "docs/dev/table/materialized-table/overview" >}}#刷新模式),然后确定数据刷新频率以满足实际数据新鲜度要求。
数据新鲜度定义了物化表内容滞后于基础表更新的最长时间。When not specified, it uses the default value from configuration based on the refresh mode. 它有两个作用,首先通过[配置]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold)确定物化表的[刷新模式]({{< ref "docs/dev/table/materialized-table/overview" >}}#刷新模式),然后确定数据刷新频率以满足实际数据新鲜度要求。

**FRESHNESS 参数详解**

Expand All @@ -128,6 +130,22 @@ FRESHNESS = INTERVAL '1' HOUR
FRESHNESS = INTERVAL '1' DAY
```

**Default FRESHNESS Example:**
(Assuming `materialized-table.default-freshness.continuous` is 3 minutes, `materialized-table.default-freshness.full` is 1 hour, and `materialized-table.refresh-mode.freshness-threshold` is 30 minutes)

```sql
-- FRESHNESS is omitted, uses the configured default of 3 minutes for CONTINUOUS mode
-- The corresponding refresh pipeline is a streaming job with a checkpoint interval of 3 minutes
CREATE MATERIALIZED TABLE my_materialized_table
AS SELECT * FROM source_table;

-- FRESHNESS is omitted and FULL mode is explicitly specified, uses the configured default of 1 hour
-- The corresponding refresh pipeline is a scheduled workflow with a schedule cycle of 1 hour
CREATE MATERIALIZED TABLE my_materialized_table_full
REFRESH_MODE = FULL
AS SELECT * FROM source_table;
```

**不合法的 `FRESHNESS` 示例:**

```sql
Expand All @@ -147,6 +165,7 @@ FRESHNESS = INTERVAL '5' HOUR
```

<span class="label label-danger">注意</span>
- If FRESHNESS is not specified, the table will use the default freshness interval based on the refresh mode: `materialized-table.default-freshness.continuous` (default: 3 minutes) for CONTINUOUS mode, or `materialized-table.default-freshness.full` (default: 1 hour) for FULL mode.
- 尽管物化表数据将尽可能在定义的新鲜度内刷新,但不能保证完全满足新鲜度要求。
- 在持续模式下,数据新鲜度和 `checkpoint` 间隔一致,设置过短的数据新鲜度可能会对作业性能产生影响。此外,为了优化 `checkpoint` 性能,建议[开启 Changelog]({{< ref "docs/ops/state/state_backends" >}}#开启-changelog)。
- 在全量模式下,数据新鲜度会转换为 `cron` 表达式,因此目前仅支持在预定义时间间隔单位内的新鲜度间隔,这种设计确保了与 `cron` 表达式语义的一致性。具体支持以下新鲜度间隔:
Expand All @@ -168,14 +187,14 @@ CREATE MATERIALIZED TABLE my_materialized_table
FRESHNESS = INTERVAL '1' HOUR
REFRESH_MODE = CONTINUOUS
AS SELECT
...
...

-- 创建的物化表的刷新模式为全量模式,作业的调度周期为 10 分钟。
CREATE MATERIALIZED TABLE my_materialized_table
FRESHNESS = INTERVAL '10' MINUTE
REFRESH_MODE = FULL
AS SELECT
...
...
```

## AS <select_statement>
Expand Down Expand Up @@ -204,22 +223,22 @@ CREATE MATERIALIZED TABLE my_materialized_table_continuous
'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
)
FRESHNESS = INTERVAL '10' SECOND
AS
SELECT
AS
SELECT
k.ds,
k.user_id,
COUNT(*) AS event_count,
SUM(k.amount) AS total_amount,
MAX(u.age) AS max_age
FROM
FROM
kafka_catalog.db1.kafka_table k
JOIN
JOIN
user_catalog.db1.user_table u
ON
ON
k.user_id = u.user_id
WHERE
WHERE
k.event_type = 'purchase'
GROUP BY
GROUP BY
k.ds, k.user_id
```

Expand All @@ -233,22 +252,22 @@ CREATE MATERIALIZED TABLE my_materialized_table_full
'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
)
FRESHNESS = INTERVAL '1' HOUR
AS
SELECT
AS
SELECT
p.ds,
p.product_id,
p.product_name,
AVG(s.sale_price) AS avg_sale_price,
SUM(s.quantity) AS total_quantity
FROM
FROM
paimon_catalog.db1.product_table p
LEFT JOIN
LEFT JOIN
paimon_catalog.db1.sales_table s
ON
ON
p.product_id = s.product_id
WHERE
WHERE
p.category = 'electronics'
GROUP BY
GROUP BY
p.ds, p.product_id, p.product_name
```

Expand Down Expand Up @@ -276,7 +295,7 @@ ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND

`SUSPEND` 用于暂停物化表的后台刷新管道。

**示例:**
**示例:**

```sql
-- 暂停前指定 SAVEPOINT 路径
Expand All @@ -297,7 +316,7 @@ ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name RESUME [WITH (key1=

`RESUME` 用于恢复物化表的刷新管道。在恢复时,可以通过 `WITH` 子句动态指定物化表的参数,该参数仅对当前恢复的刷新管道生效,并不会持久化到物化表中。

**示例:**
**示例:**

```sql
-- 恢复指定的物化表
Expand Down Expand Up @@ -358,21 +377,21 @@ ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS <select_statemen
-- 原始物化表定义
CREATE MATERIALIZED TABLE my_materialized_table
FRESHNESS = INTERVAL '10' SECOND
AS
SELECT
AS
SELECT
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount
FROM
FROM
kafka_catalog.db1.events
WHERE
WHERE
event_type = 'purchase'
GROUP BY
GROUP BY
user_id;

-- 修改现有物化表的查询
ALTER MATERIALIZED TABLE my_materialized_table
AS SELECT
AS SELECT
user_id,
COUNT(*) AS event_count,
SUM(amount) AS total_amount,
Expand Down Expand Up @@ -403,7 +422,3 @@ DROP MATERIALIZED TABLE [IF EXISTS] [catalog_name.][database_name.]table_name
-- 删除指定的物化表
DROP MATERIALIZED TABLE IF EXISTS my_materialized_table;
```




4 changes: 3 additions & 1 deletion docs/content/docs/dev/table/materialized-table/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ Materialized Table encompass the following core concepts: Data Freshness, Refres

## Data Freshness

Data freshness defines the maximum amount of time that the materialized table’s content should lag behind updates to the base tables. Freshness is not a guarantee. Instead, it is a target that Flink attempts to meet. Data in materialized table is refreshed as closely as possible within the freshness.
Data freshness defines the maximum amount of time that the materialized table's content should lag behind updates to the base tables. Freshness is not a guarantee. Instead, it is a target that Flink attempts to meet. The data in materialized table is refreshed as closely as possible within the freshness target.

Data freshness is optional when creating a materialized table. If not specified, the system uses the default freshness based on the refresh mode: [materialized-table.default-freshness.continuous]({{< ref "docs/dev/table/config" >}}#materialized-table-default-freshness-continuous) (default: 3 minutes) for CONTINUOUS mode, or [materialized-table.default-freshness.full]({{< ref "docs/dev/table/config" >}}#materialized-table-default-freshness-full) (default: 1 hour) for FULL mode.

Data freshness is a crucial attribute of a materialized table, serving two main purposes:
- **Determining the Refresh Mode**. Currently, there are CONTINUOUS and FULL modes. For details on how to determine the refresh mode, refer to the [materialized-table.refresh-mode.freshness-threshold]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold) configuration item.
Expand Down
55 changes: 36 additions & 19 deletions docs/content/docs/dev/table/materialized-table/statements.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name

[WITH (key1=val1, key2=val2, ...)]

FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
[FRESHNESS = INTERVAL '<num>' { SECOND[S] | MINUTE[S] | HOUR[S] | DAY[S] }]

[REFRESH_MODE = { CONTINUOUS | FULL }]

Expand All @@ -69,7 +69,7 @@ AS <select_statement>
CREATE MATERIALIZED TABLE my_materialized_table
PARTITIONED BY (ds)
FRESHNESS = INTERVAL '1' HOUR
AS SELECT
AS SELECT
ds
FROM
...
Expand Down Expand Up @@ -103,9 +103,11 @@ As shown in the above example, we specified the date-formatter option for the `d

`FRESHNESS` defines the data freshness of a materialized table.

`FRESHNESS` is optional. When omitted, the system uses the default freshness based on the refresh mode: `materialized-table.default-freshness.continuous` (default: 3 minutes) for CONTINUOUS mode, or `materialized-table.default-freshness.full` (default: 1 hour) for FULL mode.

**FRESHNESS and Refresh Mode Relationship**

FRESHNESS defines the maximum amount of time that the materialized tables content should lag behind updates to the base tables. It does two things, firstly it determines the [refresh mode]({{< ref "docs/dev/table/materialized-table/overview" >}}#refresh-mode) of the materialized table through [configuration]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold), followed by determines the data refresh frequency to meet the actual data freshness requirements.
FRESHNESS defines the maximum amount of time that the materialized table's content should lag behind updates to the base tables. When not specified, it uses the default value from configuration based on the refresh mode. It does two things: firstly it determines the [refresh mode]({{< ref "docs/dev/table/materialized-table/overview" >}}#refresh-mode) of the materialized table through [configuration]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold), followed by determining the data refresh frequency to meet the actual data freshness requirements.

**Explanation of FRESHNESS Parameter**

Expand All @@ -128,6 +130,22 @@ FRESHNESS = INTERVAL '1' HOUR
FRESHNESS = INTERVAL '1' DAY
```

**Default FRESHNESS Example:**
(Assuming `materialized-table.default-freshness.continuous` is 3 minutes, `materialized-table.default-freshness.full` is 1 hour, and `materialized-table.refresh-mode.freshness-threshold` is 30 minutes)

```sql
-- FRESHNESS is omitted, uses the configured default of 3 minutes for CONTINUOUS mode
-- The corresponding refresh pipeline is a streaming job with a checkpoint interval of 3 minutes
CREATE MATERIALIZED TABLE my_materialized_table
AS SELECT * FROM source_table;

-- FRESHNESS is omitted and FULL mode is explicitly specified, uses the configured default of 1 hour
-- The corresponding refresh pipeline is a scheduled workflow with a schedule cycle of 1 hour
CREATE MATERIALIZED TABLE my_materialized_table_full
REFRESH_MODE = FULL
AS SELECT * FROM source_table;
```

**Invalid `FRESHNESS` Examples:**

```sql
Expand All @@ -147,6 +165,7 @@ FRESHNESS = INTERVAL '5' HOUR
```

<span class="label label-danger">Note</span>
- If FRESHNESS is not specified, the table will use the default freshness interval based on the refresh mode: `materialized-table.default-freshness.continuous` (default: 3 minutes) for CONTINUOUS mode, or `materialized-table.default-freshness.full` (default: 1 hour) for FULL mode.
- The materialized table data will be refreshed as closely as possible within the defined freshness but cannot guarantee complete satisfaction.
- In CONTINUOUS mode, setting a data freshness interval that is too short can impact job performance as it aligns with the checkpoint interval. To optimize checkpoint performance, consider [enabling-changelog]({{< ref "docs/ops/state/state_backends" >}}#incremental-checkpoints).
- In FULL mode, data freshness must be translated into a cron expression, consequently, only freshness intervals within predefined time spans are presently accommodated, this design ensures alignment with cron's capabilities. Specifically, support for the following freshness:
Expand All @@ -168,14 +187,14 @@ CREATE MATERIALIZED TABLE my_materialized_table
FRESHNESS = INTERVAL '1' HOUR
REFRESH_MODE = CONTINUOUS
AS SELECT
...
...

-- The refresh mode of the created materialized table is FULL, and the job's schedule cycle is 10 minutes.
CREATE MATERIALIZED TABLE my_materialized_table
FRESHNESS = INTERVAL '10' MINUTE
REFRESH_MODE = FULL
AS SELECT
...
...
```

## AS <select_statement>
Expand Down Expand Up @@ -204,22 +223,21 @@ CREATE MATERIALIZED TABLE my_materialized_table_continuous
'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
)
FRESHNESS = INTERVAL '10' SECOND
AS
SELECT
AS SELECT
k.ds,
k.user_id,
COUNT(*) AS event_count,
SUM(k.amount) AS total_amount,
MAX(u.age) AS max_age
FROM
FROM
kafka_catalog.db1.kafka_table k
JOIN
JOIN
user_catalog.db1.user_table u
ON
ON
k.user_id = u.user_id
WHERE
WHERE
k.event_type = 'purchase'
GROUP BY
GROUP BY
k.ds, k.user_id
```

Expand All @@ -233,22 +251,21 @@ CREATE MATERIALIZED TABLE my_materialized_table_full
'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
)
FRESHNESS = INTERVAL '1' HOUR
AS
SELECT
AS SELECT
p.ds,
p.product_id,
p.product_name,
AVG(s.sale_price) AS avg_sale_price,
SUM(s.quantity) AS total_quantity
FROM
FROM
paimon_catalog.db1.product_table p
LEFT JOIN
LEFT JOIN
paimon_catalog.db1.sales_table s
ON
ON
p.product_id = s.product_id
WHERE
WHERE
p.category = 'electronics'
GROUP BY
GROUP BY
p.ds, p.product_id, p.product_name
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>materialized-table.default-freshness.continuous</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">3 min</td>
<td>Duration</td>
<td>The default freshness interval for continuous refresh mode when the FRESHNESS clause is omitted in a materialized table definition.</td>
</tr>
<tr>
<td><h5>materialized-table.default-freshness.full</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">1 h</td>
<td>Duration</td>
<td>The default freshness interval for full refresh mode when the FRESHNESS clause is omitted in a materialized table definition.</td>
</tr>
<tr>
<td><h5>materialized-table.refresh-mode.freshness-threshold</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">30 min</td>
Expand Down
Loading