FlinkSQL通解
参考文档
https://blog.csdn.net/be_racle/article/details/135921061?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522604e8b91e59f598cb3c69ae05c0628f7%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=604e8b91e59f598cb3c69ae05c0628f7&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2allfirst_rank_ecpm_v1~rank_v31_ecpm-20-135921061-null-null.142v101pc_search_result_base6&utm_term=FlinkSQL&spm=1018.2226.3001.4187
1. 基本原理
2. 编码套路
使用Flink SQL时,我们通常会遵循如下编码套路,这些套路和使用Flink API的套路是一样的:
-
环境准备:初始化一个TableEnvironment对象,它是执行Flink SQL语句的核心。这个环境可以是流数据环境,也可以是批数据环境。
-
数据源定义:通过CREATE TABLE语句定义输入数据源(source),可以是Kafka、CSV文件等。
-
数据处理:编写SQL语句对数据进行处理,如查询、过滤、聚合等。
-
数据输出:通过CREATE TABLE定义输出数据源(sink),并将处理结果输出。
3、相关语法
Create关键字
Create语句用于向当前或指定的Catalog中注册库、表、视图或者函数,注册后的库、表、视图可以在后期的SQL查询中进行使用。
目前FlinkSQL中支持以下Create语句:
-
Create Table
-
Create DataBase
-
Create View
-
Create Function
官方定义
根据指定的表名创建一个表,如果同名表已经在CataLog中存在了,则无法创建。
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name ( { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n] [ <watermark_definition> ] [ <table_constraint> ][ , ...n] ) [COMMENT table_comment] [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] WITH (key1=val1, key2=val2, ...) [ LIKE source_table [( <like_options> )] ] <physical_column_definition>: -- 物理列定义 column_name column_type [ <column_constraint> ] [COMMENT column_comment] <column_constraint>: [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED<table_constraint>: [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED<metadata_column_definition>: column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]<computed_column_definition>: column_name AS computed_column_expression [COMMENT column_comment]<watermark_definition>: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression<source_table>: [catalog_name.][db_name.]table_name<like_options>:{ { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS } | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } }[, ...]
列定义
(1) 常规列
常规列即物理列,定义了物理介质中存储的数据中字段的名称、类型与顺序。其他类型的列也可以在物理列之间进行声明,但是不会影响最终物理列的读取。
形式举例
CREATE TABLE MyTable ( `user_id` BIGINT, `name` STRING) WITH ( ...);
(2) 元数据列
元数据列是SQL标准的拓展,允许访问数据源本身具有的一些元数据,元数据列由METADATA关键字标识。
例如,我们可以使用元数据列从 Kafka 数据中读取 Kafka 数据自带的时间戳(这个时间戳不是数据中的某个时间戳字段,而是数据写入 Kafka 时,Kafka 引擎给这条数据打上的时间戳标记),然后我们可以在 Flink SQL 中使用这个时间戳,比如进行基于时间的窗口操作。
CREATE TABLE MyTable ( `user_id` BIGINT, `name` STRING, -- 读取 kafka 本身自带的时间戳 `record_time` TIMESTAMP_LTZ(3) METADATA FROM \'timestamp\') WITH ( \'connector\' = \'kafka\' ...);-- 后期数据处理INSERT INTO MyTable SELECT user_id , name , record_time + INTERVAL \'1\' SECOND FROM MyTable; -- 如果自定义的列名称和 Connector 中定义 metadata 字段的名称一样的话,FROM xxx 子句是可以被省略的。 CREATE TABLE MyTable ( `user_id` BIGINT, `name` STRING, -- 读取 kafka 本身自带的时间戳 `timestamp` TIMESTAMP_LTZ(3) METADATA) WITH ( \'connector\' = \'kafka\' ...);
需要注意的是,每种Connecor提供的METADATA字段不一样,需要参考https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/。
默认情况下,FlinkSQL认为meta列是可以读取也可以写入的,但是有些外部存储系统的元数据只能用于读取不能写入。
可以选择使用VIRTUAL关键字来标识某个元数据列不写入外部存储中。
CREATE TABLE MyTable ( -- sink 时会写入 `timestamp` BIGINT METADATA, -- sink 时不写入 `offset` BIGINT METADATA VIRTUAL, `user_id` BIGINT, `name` STRING,) WITH ( \'connector\' = \'kafka\' ...);
这里在写入时需要注意,不要在 SQL 的 INSERT INTO 语句中写入 offset 列,否则 Flink SQL 任务会直接报错。
(3) 计算列
计算列在写建表的DDL时,可以拿已有的一些列经过一些自定义的运算生成的新列。
CREATE TABLE MyTable ( `user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE, -- cost 就是使用 price 和 quanitity 生成的计算列,计算方式为 price * quanitity `cost` AS price * quanitity,) WITH ( \'connector\' = \'kafka\' ...);
需要注意的是:如果只是简单的四则运算的话可以直接写在DML中就可以,但是计算列一些用于定义时间属性。把输入数据的时间格式标准化,处理时间、事件时间举例如下:
-
处理时间:使用PROCTIME()函数来定义处理时间列。
-
事件时间:事件时间的时间戳可以在声明WaterMark之前进行预处理
需要注意的是:和虚拟列类型,计算列只能读取不能写入。
WaterMark定义
WaterMark是在Create Table中进行定义的,具体SQL语法标准是:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
-
rowtime_column_name:表的事件时间属性字段。该列必须是 TIMESTAMP(3)、TIMESTAMP_LTZ(3) 类,这个时间可以是一个计算列。
-
watermark_strategy_expression:定义 Watermark 的生成策略。Watermark 的一般都是由 rowtime_column_name 列减掉一段固定时间间隔。SQL 中 Watermark 的生产策略是:当前 Watermark 大于上次发出的 Watermark 时发出当前 Watermark。
注意:
如果你使用的是事件时间语义,那么必须要设设置事件时间属性和 WATERMARK 生成策略。
Watermark 的发出频率:Watermark 发出一般是间隔一定时间的,Watermark 的发出间隔时间可以由 pipeline.auto-watermark-interval 进行配置,如果设置为 200ms 则每 200ms 会计算一次 Watermark,如果比之前发出的 Watermark 大,则发出。如果间隔设为 0ms,则 Watermark 只要满足触发条件就会发出,不会受到间隔时间控制。
FlinkSQL定义的WATERMARK生产策略
-
有界无序:设置方式:WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。此类策略用于设置最大的乱序时间,
-
严格升序: 设置方式为:。一般基本不用这种方式,如果你能保证你的数据源的时间戳是严格升序的,认为时间戳只会越来越大,也不存在相等的情况,只有相等或者小于之前的,认为迟到的数据。
-
递增:设置方式为:。一般基本不用这种方式,如果设置此类,则允许tyou
With定义
在建表时,描述数据源、数据汇的具体外部存储的元数据信息。
一般的With的配置项由FlinkSQL的Connector来定义,每种Connector提供的With配置项都不同。内置连接器参考官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/
CREATE TABLE KafkaTable ( `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING, `ts` TIMESTAMP(3) METADATA FROM \'timestamp\') WITH ( \'connector\' = \'kafka\', -- 外部存储的方式 \'topic\' = \'user_behavior\',-- 主题信息 \'properties.bootstrap.servers\' = \'localhost:9092\', \'properties.group.id\' = \'testGroup\', -- 使用哪个组消费 \'scan.startup.mode\' = \'earliest-offset\',-- 消费方式 \'format\' = \'csv\'-- 在读入与写出时的序列化方式)
Like定义
like字句是Create Table字句的一个延申。
CREATE TABLE Orders ( `user` BIGINT, product STRING, order_time TIMESTAMP(3)) WITH ( \'connector\' = \'kafka\', \'scan.startup.mode\' = \'earliest-offset\');CREATE TABLE Orders_with_watermark ( -- 1. 添加了 WATERMARK 定义 WATERMARK FOR order_time AS order_time - INTERVAL \'5\' SECOND ) WITH ( -- 2. 覆盖了原 Orders 表中 scan.startup.mode 参数 \'scan.startup.mode\' = \'latest-offset\')-- 3. Like 子句声明是在原来的 Orders 表的基础上定义 Orders_with_watermark 表LIKE Orders;
4. 代码示例
FlinkSQL第一个例子
第一步:引用依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.17.0</version></dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.17.0</version></dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>1.17.0</version> </dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.17.0</version> </dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>1.17.0</version> </dependency>
第二步:编写代码
public static void main(String[] args) throws Exception { // 加载环境依赖 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings build = EnvironmentSettings.newInstance() .inBatchMode() // 批处理模式 .build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, build); // 定义数据源 String createSourceTableDdl = \"CREATE TABLE csv_source (\" + \" user_id INT,\" + \" product STRING,\" + \" order_amount DOUBLE\" + \") WITH (\" + \" \'connector\' = \'filesystem\',\" + \" \'path\' = \'file:///E:/input.csv\',\" + \" \'format\' = \'csv\'\" + \")\"; tableEnvironment.executeSql(createSourceTableDdl); /*String query = \"SELECT user_id, SUM(order_amount) AS total_amount FROM csv_source GROUP BY user_id\"; tableEnvironment.executeSql(query).print(); env.execute(\"flink SQL Demo\");*/ // 定义输出源 String createSinkTableDdl = \"CREATE TABLE csv_sink (\" + \" user_id INT,\" + \" total_amount DOUBLE\" + \") WITH (\" + \" \'connector\' = \'filesystem\',\" + \" \'path\' = \'file:///E:/output.csv\',\" + \" \'format\' = \'csv\'\" + \")\"; tableEnvironment.executeSql(createSinkTableDdl); // 将执行结果输入到sink中 String query = \"INSERT INTO csv_sink \" + \"SELECT user_id, SUM(order_amount) as total_amount \" + \"FROM csv_source \" + \"GROUP BY user_id\"; tableEnvironment.executeSql(query).print(); //env.execute();}
附录
ProcTime函数
是指Flink算子执行具体操作时的机器系统时间,通常以毫秒为单位。
Watermark机制
概念
Flink SQL中的Watermark用于处理流数据中的时间相关问题,特别是在无界流中,时间的延迟、乱序以及处理的顺序会对结果产生影响。Watermark的主要作用是用来标记事件的最大时间戳,并且帮助Flink确定何时可以触发事件时间上的窗口计算。
具体用时间窗口操作:Watermark可以帮助触发窗口操作的计算。当一个事件的时间戳超过了当前Watermark时,窗口计算会被触发。
-
乱序事件处理:当事件发生顺序与事件流中的实际顺序不一致时(即乱序事件),Watermark可以帮助Flink判断何时可以安全地处理和计算事件。
-
延迟处理:Watermark允许Flink推迟处理窗口,直到某些延迟的事件到达,并且不丢失它们。
具体例子
假设我们有一个流数据系统,记录用户的购买事件,并且我们想按时间窗口统计每个用户在某个时间段内的购买次数。每个事件有一个时间戳,表示事件发生的时间,但由于网络延迟或事件乱序,事件的到达时间可能不同于实际发生的时间。
CREATE TABLE purchases ( user_id STRING, amount DECIMAL(10, 2), event_time TIMESTAMP(3), WATERMARK FOR event_time time - INTERVAL \'5\' SECOND) WITH ( \'connector\' = \'kafka\', \'topic\' = \'user_purchases\', \'properties.bootstrap.servers\' = \'localhost:9092\', \'format\' = \'json\');-- 每10秒的滚动窗口,统计用户购买次数SELECT user_id, COUNT(*) AS purchase_countFROM purchasesGROUP BY user_id, TUMBLE(event_time, INTERVAL \'10\' SECOND);-- WATERMARK FOR event_time AS event_time - INTERVAL \'5\' SECOND:定义了一个Watermark策略,表示Watermark会比事件的时间戳晚5秒,以处理可能出现的延迟事件。-- TUMBLE(event_time, INTERVAL \'10\' SECOND):对event_time进行每10秒的滚动窗口计算。-- 当Flink接收到某个事件时,它会根据事件的时间戳来更新Watermark。Watermark值为事件时间戳减去5秒,这样可以处理一定时间内乱序到达的事件,确保计算窗口时不会遗漏迟到的事件。


