diff --git a/docs/content.zh/docs/dev/table/materialized-table/deployment.md b/docs/content.zh/docs/dev/table/materialized-table/deployment.md new file mode 100644 index 0000000000000..ee046807bc15e --- /dev/null +++ b/docs/content.zh/docs/dev/table/materialized-table/deployment.md @@ -0,0 +1,220 @@ +--- +title: 部署 +weight: 3 +type: docs +aliases: +- /dev/table/materialized-table/deployment.html +--- + + +# Introduction + +物化表的执行涉及多个组件的协同工作,相比普通的 Flink 作业,需要额外的配置以确保其正常运行。本文将从架构解析、环境准备、部署流程到操作实践,系统地说明物化表的完整部署方案。 + +# 架构介绍 + +- **Client**: 可以是任何能够与 [Flink SQL Gateway]({{< ref "docs/dev/table/sql-gateway/overview" >}}) 交互的客户端,如 [SQL 客户端]({{< ref "docs/dev/table/sqlClient" >}})、[Flink JDBC 驱动]({{< ref "docs/dev/table/jdbcDriver" >}}) 等。 +- **Flink SQL Gateway**: 支持创建、修改和删除物化表。并包含了一个内置的工作流调度器,用于定期刷新全量模式的物化表。 +- **Flink Cluster**: 用于运行物化表刷新作业的 Flink 集群。 +- **Catalog**: 负责管理物化表元数据的创建、查询、修改和删除。 +- **Catalog Store**: 提供 Catalog 属性持久化功能,以便在操作物化表时自动初始化 Catalog 并获取相关的元数据。 + +{{< img src="/fig/materialized-table-architecture.svg" alt="Illustration of Flink Materialized Table Architecture" width="85%" >}} + + +# 部署准备 + +## Flink 集群环境准备 + +物化表刷新作业目前支持在以下集群环境中运行: +* [Standalone clusters]({{}}) +* [YARN clusters]({{}}) +* [Kubernetes clusters]({{}}) + +## 部署 SQL Gateway + +物化表必须通过 SQL Gateway 创建,SQL Gateway 需要针对元数据持久化和作业调度进行特定的配置。 + +### 配置 Catalog Store + +在 `config.yaml` 中增加 `catalog store` 相关配置: +```yaml +table: + catalog-store: + kind: file + file: + path: {path_to_catalog_store} # 替换成实际的路径 +``` +更多详情配置可参考 [Catalog Store]({{}}#catalog-store)。 + +### 配置调度插件 + +在 `config.yaml` 增加配置调度器配置,用于定时调度刷新任务。 当前我们仅支持 `embedded` 调度器 + +```yaml +workflow-scheduler: + type: embedded +``` + +### 启动 SQL Gateway + +使用以下命令启动 SQL Gateway : +``` +./sql-gateway.sh start +``` + +### 创建 Catalog + +尽管可以在创建物化表时创建 Catalog,但我们建议预先创建 Catalog 以实现: +* 1. 将 Catalog 配置持久化到 Catalog Store +* 2. 确保所有 SQL Gateway session 自动加载 Catalog + +**步骤:** +1. 连接到 SQL Gateway : +```shell +./sql-client gateway --endpoint {gateway_endpoint}:{gateway_port} +``` + +2. 创建 Catalog + +```sql +Flink SQL > CREATE CATALOG paimon_catalog with ( + 'type' = 'paimon', + 'warehouse' = 'oss://{paimon_warehouse}' -- 替换为实际的路径 +); +[INFO] Execute statement succeeded. +``` +注意 +创建的 Catalog 必须支持 Flink 物化表类型。目前,只有 [Paimon catalog](https://paimon.apache.org/docs/master/concepts/table-types/#materialized-table) 支持创建 Flink 物化表。 + +# 操作指南 + +## 连接到 SQL Gateway + +使用 SQL Client 的示例: + +```shell +./sql-client.sh gateway --endpoint {gateway_endpoint}:{gateway_port} +``` + +## 创建物化表 + +### 在 Standalone 集群运行刷新任务 + +```sql +Flink SQL> SET 'execution.mode' = 'remote'; +[INFO] Execute statement succeeded. + +FLINK SQL> CREATE MATERIALIZED TABLE my_materialized_table +> ... +[INFO] Execute statement succeeded. +``` + +### 在 session 模式下运行刷新任务 + +在 session 模式下执行时,需要提前创建 session ,具体可以参考文档 [yarn-session]({{< ref "docs/deployment/resource-providers/yarn" >}}#starting-a-flink-session-on-yarn) 和 [kubernetes-session]({{}}#starting-a-flink-session-on-kubernetes) + +**Kubernetes session 模式:** + +```sql +Flink SQL> SET 'execution.mode' = 'kubernetes-session'; +[INFO] Execute statement succeeded. + +Flink SQL> SET 'kubernetes.cluster-id' = 'flink-cluster-mt-session-1'; +[INFO] Execute statement succeeded. + +FLINK SQL> CREATE MATERIALIZED TABLE my_materialized_table +> ... +[INFO] Execute statement succeeded. +``` + +设置 `execution.mode` 为 `kubernetes-session` 并设置参数 `kubernetes.cluster-id` 指向一个已经存在的 Kubernetes session 集群. + +**YARN session 模式:** + +```sql +Flink SQL> SET 'execution.mode' = 'yarn-session'; +[INFO] Execute statement succeeded. + +Flink SQL> SET 'yarn.application.id' = 'application-xxxx'; +[INFO] Execute statement succeeded. + +FLINK SQL> CREATE MATERIALIZED TABLE my_materialized_table +> ... +[INFO] Execute statement succeeded. +``` +设置 `execution.mode` 为 `yarn-session` 并设置参数 `yarn.application.id` 指向一个已经存在的 YARN session 集群。 + +### 在 application 模式下运行刷新任务 + +**Kubernetes application 模式:** + +```sql +Flink SQL> SET 'execution.mode' = 'kubernetes-application'; +[INFO] Execute statement succeeded. + +Flink SQL> SET 'kubernetes.cluster-id' = 'flink-cluster-mt-application-1'; +[INFO] Execute statement succeeded. + +FLINK SQL> CREATE MATERIALIZED TABLE my_materialized_table +> ... +[INFO] Execute statement succeeded. +``` +设置 `execution.mode` 为 `kubernetes-application` ,`kubernetes.cluster-id` 是一个可选配置,如果未配置,在提交作业时会自动生成。 + +**YARN application 模式:** + +```sql +Flink SQL> SET 'execution.mode' = 'yarn-application'; +[INFO] Execute statement succeeded. + +FLINK SQL> CREATE MATERIALIZED TABLE my_materialized_table +> ... +[INFO] Execute statement succeeded. +``` +设置 `execution.mode` 为 `yarn-application` ,`yarn.application.id` 无需配置。 + +## 运维操作 + +集群信息(如 `execution.mode` 或 `kubernetes.cluster-id`)已持久化在 Catalog 中,暂停或恢复物化表刷新作业时无需重复设置。 + +### 暂停刷新任务 +```sql +-- 暂停物化表刷新任务 +Flink SQL> ALTER MATERIALIZED TABLE my_materialized_table SUSPEND +[INFO] Execute statement succeeded. +``` + +### 恢复刷新任务 +```sql +-- 恢复物化表刷新任务 +Flink SQL> ALTER MATERIALIZED TABLE my_materialized_table RESUME +[INFO] Execute statement succeeded. +``` + +### 修改查询定义 +```sql +-- 修改物化表查询定义 +Flink SQL> ALTER MATERIALIZED TABLE my_materialized_table +> AS +> ... + +[INFO] Execute statement succeeded. +``` \ No newline at end of file diff --git a/docs/content.zh/docs/dev/table/materialized-table/overview.md b/docs/content.zh/docs/dev/table/materialized-table/overview.md index c1d381a38da77..9ce5e9ba34f7d 100644 --- a/docs/content.zh/docs/dev/table/materialized-table/overview.md +++ b/docs/content.zh/docs/dev/table/materialized-table/overview.md @@ -28,10 +28,6 @@ under the License. 物化表是 Flink SQL 引入的一种新的表类型,旨在简化批处理和流处理数据管道,提供一致的开发体验。在创建物化表时,通过指定数据新鲜度和查询,Flink 引擎会自动推导出物化表的 Schema ,并创建相应的数据刷新管道,以达到指定的新鲜度。 -{{< hint warning >}} -**注意**:该功能目前是一个 MVP(最小可行产品)功能,仅在 [SQL Gateway]({{< ref "docs/dev/table/sql-gateway/overview" >}})中可用,并且只支持部署作业到 Flink [Standalone]({{< ref "docs/deployment/resource-providers/standalone/overview" >}})集群。 -{{< /hint >}} - # 核心概念 物化表包含以下核心概念:数据新鲜度、刷新模式、查询定义和 `Schema` 。 diff --git a/docs/content.zh/docs/dev/table/materialized-table/quickstart.md b/docs/content.zh/docs/dev/table/materialized-table/quickstart.md index 623373fd6e1d5..023e893542556 100644 --- a/docs/content.zh/docs/dev/table/materialized-table/quickstart.md +++ b/docs/content.zh/docs/dev/table/materialized-table/quickstart.md @@ -1,6 +1,6 @@ --- title: 快速入门 -weight: 3 +weight: 4 type: docs aliases: - /dev/table/materialized-table/quickstart.html @@ -29,16 +29,6 @@ under the License. 本入门指南将帮助你快速了解并开始使用物化表。内容包括环境设置,以及创建、修改和删除持续模式和全量模式的物化表。 -# 架构介绍 - -- **Client**: 可以是任何能够与 [Flink SQL Gateway]({{< ref "docs/dev/table/sql-gateway/overview" >}}) 交互的客户端,如 [SQL 客户端]({{< ref "docs/dev/table/sqlClient" >}})、[Flink JDBC 驱动]({{< ref "docs/dev/table/jdbcDriver" >}}) 等。 -- **Flink SQL Gateway**: 支持创建、修改和删除物化表。并包含了一个内置的工作流调度器,用于定期刷新全量模式的物化表。 -- **Flink Cluster**: 用于运行物化表刷新作业的 Flink 集群。 -- **Catalog**: 负责管理物化表元数据的创建、查询、修改和删除。 -- **Catalog Store**: 提供 Catalog 属性持久化功能,以便在操作物化表时自动初始化 Catalog 并获取相关的元数据。 - -{{< img src="/fig/materialized-table-architecture.svg" alt="Illustration of Flink Materialized Table Architecture" width="85%" >}} - # 环境搭建 ## 目录准备 diff --git a/docs/content/docs/dev/table/materialized-table/deployment.md b/docs/content/docs/dev/table/materialized-table/deployment.md new file mode 100644 index 0000000000000..79175c124c0d0 --- /dev/null +++ b/docs/content/docs/dev/table/materialized-table/deployment.md @@ -0,0 +1,225 @@ +--- +title: Deployment +weight: 3 +type: docs +aliases: +- /dev/table/materialized-table/deployment.html +--- + + +# Introduction + +The operation of Materialized Tables involves collaboration between multiple components. Compared to regular Flink jobs, additional configurations are required to ensure proper functioning of Materialized Tables. This article systematically explains the complete deployment solution for Materialized Tables, covering architectural overview, environment preparation, deployment procedures, and operational practices. + + +# Architecture Introduction + +- **Client**: Could be any client that can interact with [Flink SQL Gateway]({{< ref "docs/dev/table/sql-gateway/overview" >}}), such as [SQL Client]({{< ref "docs/dev/table/sqlClient" >}}), [Flink JDBC Driver]({{< ref "docs/dev/table/jdbcDriver" >}}) and so on. +- **Flink SQL Gateway**: Supports creating, altering, and dropping Materialized table. It also serves as an embedded workflow scheduler to periodically refresh full mode Materialized Table. +- **Flink Cluster**: The pipeline for refreshing Materialized Table will run on the Flink cluster. +- **Catalog**: Manages the creation, retrieval, modification, and deletion of the metadata of Materialized Table. +- **Catalog Store**: Supports catalog property persistence to automatically initialize catalogs for retrieving metadata in Materialized Table related operations. + +{{< img src="/fig/materialized-table-architecture.svg" alt="Illustration of Flink Materialized Table Architecture" width="85%" >}} + +# Deployment Preparation + +## Flink Cluster Setup + +Materialized Table refresh jobs currently support execution in these cluster environments: +* [Standalone clusters]({{}}) +* [YARN clusters]({{}}) +* [Kubernetes clusters]({{}}) + +## Flink SQL Gateway Deployment + +Materialized Tables must be created through SQL Gateway, which requires specific configurations for metadata persistence and job scheduling. + +### Configure Catalog Store + +Add catalog store configurations in `config.yaml` to persist catalog properties. + +```yaml +table: + catalog-store: + kind: file + file: + path: {path_to_catalog_store} # Replace with the actual path +``` +Refer to [Catalog Store]({{}}#catalog-store) for details. + +### Configure Scheduler Plugin + +Add scheduler configurations in `config.yaml` for periodic refresh job scheduling. Currently, only the `embedded` scheduler is supported: + +```yaml +workflow-scheduler: + type: embedded +``` + +### Start SQL Gateway + +Start the SQL Gateway using: + +```shell +./sql-gateway.sh start +``` + +### Create Catalog + +While catalogs can be created at Materialized Table creation time, we recommend pre-creating them to: +* 1. Persist catalog properties in Catalog Store +* 2. Ensure automatic loading by all SQL Gateway sessions + +**Steps:** +1. Connect to SQL Gateway: + +```shell +./sql-client gateway --endpoint {gateway_endpoint}:{gateway_port} +``` + +2. Create Catalog: + +```sql +Flink SQL> CREATE CATALOG paimon_catalog with ( + 'type' = 'paimon', + 'warehouse' = 'oss://{paimon_warehouse}' -- Replace with the actual path +); +[INFO] Execute statement succeeded. +``` +Note +The created catalog must support the Flink Materialized Table type. Currently, only [Paimon catalog](https://paimon.apache.org/docs/master/concepts/table-types/#materialized-table) support creating Flink Materialized Table. + +# Operation Guide + +## Connecting to SQL Gateway + +Example using SQL Client: + +```shell +./sql-client.sh gateway --endpoint {gateway_endpoint}:{gateway_port} +``` + +## Creating Materialized Tables + +### Refresh Jobs Running on Standalone Cluster + +```sql +Flink SQL> SET 'execution.mode' = 'remote'; +[INFO] Execute statement succeeded. + +FLINK SQL> CREATE MATERIALIZED TABLE my_materialized_table +> ... ; +[INFO] Execute statement succeeded. +``` + +### Refresh Jobs Running in Session Mode + +For session modes, pre-create sessions as documented in [yarn-session]({{< ref "docs/deployment/resource-providers/yarn" >}}#starting-a-flink-session-on-yarn) or [kubernetes-session]({{}}#starting-a-flink-session-on-kubernetes) + +**Kubernetes session mode:** + +```sql +Flink SQL> SET 'execution.mode' = 'kubernetes-session'; +[INFO] Execute statement succeeded. + +Flink SQL> SET 'kubernetes.cluster-id' = 'flink-cluster-mt-session-1'; +[INFO] Execute statement succeeded. + +Flink SQL> CREATE MATERIALIZED TABLE my_materialized_table +> ... ; +[INFO] Execute statement succeeded. +``` +Set `execution.mode` to `kubernetes-session` and specify a valid `kubernetes.cluster-id` corresponding to an existing Kubernetes session cluster. + +**YARN session mode:** + +```sql +Flink SQL> SET 'execution.mode' = 'yarn-session'; +[INFO] Execute statement succeeded. + +Flink SQL> SET 'yarn.application.id' = 'application-xxxx'; +[INFO] Execute statement succeeded. + +Flink SQL> CREATE MATERIALIZED TABLE my_materialized_table +> ... ; +[INFO] Execute statement succeeded. +``` +Set `execution.mode` to `yarn-session` and specify a valid `yarn.application.id` corresponding to an existing YARN session cluster. + +### Refresh Jobs Running in Application Mode + +**Kubernetes application mode:** + +```sql +Flink SQL> SET 'execution.mode' = 'kubernetes-application'; +[INFO] Execute statement succeeded. + +Flink SQL> SET 'kubernetes.cluster-id' = 'flink-cluster-mt-application-1'; +[INFO] Execute statement succeeded. + +Flink SQL> CREATE MATERIALIZED TABLE my_materialized_table +> ... ; +[INFO] Execute statement succeeded. +``` +Set `execution.mode` to `kubernetes-application`. The `kubernetes.cluster-id` is optional; if not set, it will be automatically generated. + +**YARN application mode:** + +```sql +Flink SQL> SET 'execution.mode' = 'yarn-application'; +[INFO] Execute statement succeeded. + +Flink SQL> CREATE MATERIALIZED TABLE my_materialized_table +> ... ; +[INFO] Execute statement succeeded. +``` +Only set `execution.mode` to yarn-application. The `yarn.application.id` doesn’t need to be set; it will be automatically generated during submission. + +## Maintenance Operations + +Cluster information (e.g., `execution.mode` or `kubernetes.cluster-id`) is already persisted in the catalog and does not need to be set when suspend or resume the refresh jobs of Materialized Table. + +### Suspend Refresh Job + +```sql +-- Suspend the MATERIALIZED TABLE refresh job +Flink SQL> ALTER MATERIALIZED TABLE my_materialized_table SUSPEND; +[INFO] Execute statement succeeded. +``` + +### Resume Refresh Job + +```sql +-- Resume the MATERIALIZED TABLE refresh job +Flink SQL> ALTER MATERIALIZED TABLE my_materialized_table RESUME; +[INFO] Execute statement succeeded. +``` + +### Modify Query Definition + +```sql +-- Modify the MATERIALIZED TABLE query definition +Flink SQL> ALTER MATERIALIZED TABLE my_materialized_table +> AS SELECT +> ... ; + +[INFO] Execute statement succeeded. +``` \ No newline at end of file diff --git a/docs/content/docs/dev/table/materialized-table/overview.md b/docs/content/docs/dev/table/materialized-table/overview.md index d49492e314a9b..7b78da97a9c82 100644 --- a/docs/content/docs/dev/table/materialized-table/overview.md +++ b/docs/content/docs/dev/table/materialized-table/overview.md @@ -28,10 +28,6 @@ under the License. Materialized Table is a new table type introduced in Flink SQL, aimed at simplifying both batch and stream data pipelines, providing a consistent development experience. By specifying data freshness and query when creating Materialized Table, the engine automatically derives the schema for the materialized table and creates corresponding data refresh pipeline to achieve the specified freshness. -{{< hint warning >}} -**Note**: This feature is currently an MVP (“minimum viable product”) feature and only available within [SQL Gateway]({{< ref "docs/dev/table/sql-gateway/overview" >}}) which connected to a [Standalone]({{< ref "docs/deployment/resource-providers/standalone/overview" >}}) deployed Flink cluster. -{{< /hint >}} - # Core Concepts Materialized Table encompass the following core concepts: Data Freshness, Refresh Mode, Query Definition and Schema. diff --git a/docs/content/docs/dev/table/materialized-table/quickstart.md b/docs/content/docs/dev/table/materialized-table/quickstart.md index b5fa5bd642e97..cb4de7d506f29 100644 --- a/docs/content/docs/dev/table/materialized-table/quickstart.md +++ b/docs/content/docs/dev/table/materialized-table/quickstart.md @@ -1,6 +1,6 @@ --- title: Quickstart -weight: 3 +weight: 4 type: docs aliases: - /dev/table/materialized-table/quickstart.html @@ -28,16 +28,6 @@ under the License. This guide will help you quickly understand and get started with materialized tables. It includes setting up the environment and creating, altering, and dropping materialized tables in CONTINUOUS and FULL mode. -# Architecture Introduction - -- **Client**: Could be any client that can interact with [Flink SQL Gateway]({{< ref "docs/dev/table/sql-gateway/overview" >}}), such as [SQL Client]({{< ref "docs/dev/table/sqlClient" >}}), [Flink JDBC Driver]({{< ref "docs/dev/table/jdbcDriver" >}}) and so on. -- **Flink SQL Gateway**: Supports creating, altering, and dropping materialized tables. It also serves as an embedded workflow scheduler to periodically refresh full mode materialized tables. -- **Flink Cluster**: The pipeline for refreshing materialized tables will run on the Flink cluster. -- **Catalog**: Manages the creation, retrieval, modification, and deletion of the metadata of materialized tables. -- **Catalog Store**: Supports catalog property persistence to automatically initialize catalogs for retrieving metadata in materialized table related operations. - -{{< img src="/fig/materialized-table-architecture.svg" alt="Illustration of Flink Materialized Table Architecture" width="85%" >}} - # Environment Setup ## Directory Preparation