> 技术文档 > Flink 1.20 SQL 物化表(Materialized Table)_flink-1.20.2

Flink 1.20 SQL 物化表(Materialized Table)_flink-1.20.2

Apache Flink 1.20 作为迈向 2.0 时代的最后过渡版本,引入了物化表(Materialized Table)​​ 这一革新性特性。它通过统一的 SQL 接口重构了流批数据处理管道的开发范式,显著降低了实时数据加工的复杂度。本文将深入解析其核心原理与实践价值。

在创建物化表时指定数据新鲜度和查询,引擎会自动推导物化表的模式,并创建相应的数据刷新管道以实现指定的新鲜度。

一、核心概念解析

物化表包含以下核心概念:数据新鲜度、刷新模式、查询定义和模式。

1. 数据新鲜度(Data Freshness)​

        数据新鲜度定义了物化表的内容滞后于基表更新的最大时间量。新鲜度并不能保证。相反,这是Flink试图达到的目标。物化表中的数据在新鲜度范围内尽可能地刷新。

数据新鲜度是物化表的一个关键属性,主要有两个目的:

  1.  确定刷新模式。目前,有连续和完整模式。刷新方式的确定请参见materialize -table.refresh-mode.fresh -threshold配置项。

             连续模式:启动Flink流作业,不断刷新物化表数据。

              FULL模式:工作流调度器定期触发Flink批处理作业以刷新物化表数据。

    2.  确定刷新频率。

              在CONTINUOUS模式下,数据新鲜度被转换为Flink流作业的检查点间隔。

              在FULL模式下,数据新鲜度被转换为工作流的调度周期,例如,cron表达式。

2. 刷新模式(Refresh Mode)

有两种刷新模式:FULL和CONTINUOUS。默认情况下,刷新模式是基于数据新鲜度来推断的。用户可以显式地为特定的业务场景指定刷新模式,这将优先于数据新鲜度推断。

 CONTINUOUS连续模式:Flink流作业增量地更新物化表数据。该数据的可见性取决于相应连接器的行为,要么是立即的,要么是在检查点完成之后。

  FULL模式:调度程序定期触发对物化表数据的完全覆盖,数据刷新周期与工作流的调度周期相匹配。 

  •  默认的覆盖行为是表级的。如果存在分区字段,并且时间分区字段格式通过partition.fields.#指定。日期格式化程序,覆盖是按分区的。每次只刷新最新的分区。

3. 查询定义(Query Definition)​​ 

  • 语法支持​:完整 Flink SQL 语法,包括:
    • 多表关联(含维表 FOR SYSTEM_TIME AS OF 时态 Join)
    • 窗口聚合(TUMBLE/HOP
    • UDF/UDTF 自定义逻辑
  • 结果写入​:
    • CONTINUOUS:增量更新结果
    • FULL:全量覆盖结果

二、语法及使用

1.创建物化表

CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name[ ([  ]) ][COMMENT table_comment][PARTITIONED BY (partition_column_name1, partition_column_name2, ...)][WITH (key1=val1, key2=val2, ...)]FRESHNESS = INTERVAL \'\' { SECOND | MINUTE | HOUR | DAY }[REFRESH_MODE = { CONTINUOUS | FULL }]AS 
: [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

⚠️注意:

  1.         PRIMARY KEY定义了一个可选的列列表,用于唯一标识表中的每一行。作为主键的列必须是非空的。
  2.        PARTITIONED BY定义了一个可选的列列表,用于对物化表进行分区。如果将这个物化表用作文件系统接收器,则为每个分区创建一个目录。

2.With语法

  WITH 子句用于定义物化表的存储属性与分区行为,包含两类配置:

  1. 连接器参数​(如 \'format\' = \'json\'):指定数据存储格式及连接器类型(JSON、Parquet 等)。
  2. 分区时间格式化​(如 \'partition.fields.ds.date-formatter\' = \'yyyy-MM-dd\'):仅在 FULL 批模式下生效,用于识别最新分区以实现增量刷新。
\'partition.fields.{分区字段名}.date-formatter\' = \'时间格式\'
  • 适用场景​:仅对 ​**FULL 批模式**生效,CONTINUOUS 流模式无需此配置。
  • 核心价值​:
    • 避免全表覆盖,仅刷新最新分区​(如 ds=\'2025-07-14\'),显著降低 I/O 开销。
    • 支持历史分区回填(REFRESH PARTITION),无需重算全表。

例子:

CREATE MATERIALIZED TABLE my_materialized_tablePRIMARY KEY (ds, id) NOT ENFORCED -- 建议声明主键(非强制)PARTITIONED BY (ds)  -- 分区字段WITH ( \'format\' = \'parquet\', -- 存储格式 \'partition.fields.ds.date-formatter\' = \'yyyy-MM-dd\', -- 时间格式 \'sink.parallelism\' = \'8\' -- 并行度调优)FRESHNESS = INTERVAL \'1\' DAY -- 触发批模式AS SELECT DATE_FORMAT(ts, \'yyyy-MM-dd\') AS ds, -- 必须与格式化器一致 id, SUM(amount) AS revenueFROM ordersGROUP BY ds, id;

3.新鲜度

定义物化表的数据新鲜度。新鲜度和刷新模式的关系

fresh定义了物化表的内容滞后于基表更新的最大时间。它做两件事,首先通过配置确定物化表的刷新方式,然后确定数据刷新频率以满足实际的数据新鲜度需求。

fresh参数说明

参数的取值范围是INTERVAL \'\' {SECOND | MINUTE | HOUR | DAY}。‘’必须是一个正整数,在FULL模式下,‘’应该是各自时间间隔单位的公约数。

示例:(假设materialized-table.refresh-mode.fresh -threshold为30分钟)

-- The corresponding refresh pipeline is a streaming job with a checkpoint interval of 1 secondFRESHNESS = INTERVAL \'1\' SECOND-- The corresponding refresh pipeline is a real-time job with a checkpoint interval of 1 minuteFRESHNESS = INTERVAL \'1\' MINUTE-- The corresponding refresh pipeline is a scheduled workflow with a schedule cycle of 1 hourFRESHNESS = INTERVAL \'1\' HOUR-- The corresponding refresh pipeline is a scheduled workflow with a schedule cycle of 1 dayFRESHNESS = INTERVAL \'1\' DAY

无效的刷新Examples:

-- Interval is a negative numberFRESHNESS = INTERVAL \'-1\' SECOND-- Interval is 0FRESHNESS = INTERVAL \'0\' SECOND-- Interval is in months or yearsFRESHNESS = INTERVAL \'1\' MONTHFRESHNESS = INTERVAL \'1\' YEAR-- In FULL mode, the interval is not a common divisor of the respective time rangeFRESHNESS = INTERVAL \'60\' SECONDFRESHNESS = INTERVAL \'5\' HOUR

物化的表数据将在定义的新鲜度内尽可能地刷新,但不能保证完全满意。

在CONTINUOUS模式下,设置的数据更新间隔太短可能会影响作业性能,因为它与检查点间隔一致。要优化检查点性能,请考虑启用-changelog。

在FULL模式下,必须将数据的新鲜度转换为cron表达式,因此,当前只容纳预定义时间范围内的新鲜度间隔,这种设计确保与cron的功能保持一致。具体来说,支持以下新鲜度:

秒:间隔30、15、10、5、2和1秒。

分钟:每隔30分钟、15分钟、10分钟、5分钟、2分钟和1分钟。

小时:间隔8、4、2和1小时。

天:1天。

4.刷新模式

REFRESH_MODE用于显式指定物化表的刷新模式。指定的模式优先于框架的自动推理,以满足特定场景的需求。

示例:(假设materialized-table.refresh-mode.fresh -threshold为30分钟)

-- The refresh mode of the created materialized table is CONTINUOUS, and the job\'s checkpoint interval is 1 hour.CREATE MATERIALIZED TABLE my_materialized_table FRESHNESS = INTERVAL \'1\' HOUR REFRESH_MODE = CONTINUOUS AS SELECT ... -- The refresh mode of the created materialized table is FULL, and the job\'s schedule cycle is 10 minutes.CREATE MATERIALIZED TABLE my_materialized_table FRESHNESS = INTERVAL \'10\' MINUTE REFRESH_MODE = FULL AS SELECT ... 

5.AS 语法

该子句用于定义填充物化视图数据的查询。上游表可以是物化表、表或视图。select语句支持所有Flink SQL查询。

CREATE MATERIALIZED TABLE my_materialized_table FRESHNESS = INTERVAL \'10\' SECOND AS SELECT * FROM kafka_catalog.db1.kafka_table;

1:订单明细实时宽表(CONTINUOUS 模式)​

-- 创建含 3 分钟新鲜度的物化表CREATE MATERIALIZED TABLE order_detail_view PRIMARY KEY (order_id) NOT ENFORCED FRESHNESS = INTERVAL \'3\' MINUTE AS SELECT o.order_id, u.vip_level, i.product_price, SUM(i.quantity) OVER(PARTITION BY o.user_id) AS total_quantity FROM orders o JOIN users FOR SYSTEM_TIME AS OF o.proctime AS u ON o.user_id=u.user_id JOIN order_items i ON o.order_id=i.order_id;

优势​:

  • 自动构建 CDC 源表到结果表的流管道
  • 状态仅需维护短期增量数据,非永久全量状态
场景 2:日级销售报表(FULL 模式)
CREATE MATERIALIZED TABLE daily_sales PARTITIONED BY (sale_date) FRESHNESS = INTERVAL \'1\' DAY AS SELECT DATE_FORMAT(order_time,\'yyyy-MM-dd\') AS sale_date, SUM(price*quantity) AS revenue FROM orders GROUP BY sale_date;

优势​:

  • 每日批量覆盖分区数据,避免流计算资源浪费
  • 支持手动刷新历史分区(REFRESH PARTITION

6.ALTER 物化表

ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] | REFRESH [PARTITION partition_spec]

 7.SUSPEND用于暂停物化表

ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND
Example:-- Specify SAVEPOINT path before pausingSET \'execution.checkpointing.savepoint-dir\' = \'hdfs://savepoint_path\';-- Suspend the specified materialized tableALTER MATERIALIZED TABLE my_materialized_table SUSPEND;

 ⚠️注意:当以CONTINUOUS模式挂起表时,默认情况下将使用STOP WITH SAVEPOINT暂停作业。您需要使用参数设置SAVEPOINT保存路径。

8.恢复 

ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name RESUME [WITH (key1=val1, key2=val2, ...)]

RESUME用于恢复物化表的刷新管道。具体化的表动态选项可以通过WITH options子句指定,它只对当前刷新的管道生效,而不是持久的。

Example:

-- Resume the specified materialized tableALTER MATERIALIZED TABLE my_materialized_table RESUME;-- Resume the specified materialized table and specify sink parallelismALTER MATERIALIZED TABLE my_materialized_table RESUME WITH (\'sink.parallelism\'=\'10\');

9.删除表

DROP MATERIALIZED TABLE [IF EXISTS] [catalog_name.][database_name.]table_name