【Flink1.14实战】Docker环境Flink Sql DataGen 快速开始
DataGen SQL 连接器
DataGen 连接器允许按数据生成规则进行读取。
DataGen 连接器可以使用计算列语法。 这使您可以灵活地生成记录。
DataGen 连接器是内置的。
注意 不支持复杂类型: Array,Map,Row。 请用计算列构造这些类型。
创建一个 DataGen 的表
表的有界性:当表中字段的数据全部生成完成后,source 就结束了。 因此,表的有界性取决于字段的有界性。
每个列,都有两种生成数据的方法:
- 随机生成器是默认的生成器,您可以指定随机生成的最大和最小值。char、varchar、string (类型)可以指定长度。它是无界的生成器。
- 序列生成器,您可以指定序列的起始和结束值。它是有界的生成器,当序列数字达到结束值,读取结束。
CREATE TABLE datagen ( f_sequence INT, f_random INT, f_random_str STRING, ts AS localtimestamp, WATERMARK FOR ts AS ts) WITH ( 'connector' = 'datagen', -- optional options -- 'rows-per-second'='5', 'fields.f_sequence.kind'='sequence', 'fields.f_sequence.start'='1', 'fields.f_sequence.end'='1000', 'fields.f_random.min'='1', 'fields.f_random.max'='1000', 'fields.f_random_str.length'='10')
连接器参数
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 必须 | (none) | String | 指定要使用的连接器,这里是 ‘datagen’。 |
rows-per-second | 可选 | 10000 | Long | 每秒生成的行数,用以控制数据发出速率。 |
fields.#.kind | 可选 | random | String | 指定 ‘#’ 字段的生成器。可以是 ‘sequence’ 或 ‘random’。 |
fields.#.min | 可选 | (Minimum value of type) | (Type of field) | 随机生成器的最小值,适用于数字类型。 |
fields.#.max | 可选 | (Maximum value of type) | (Type of field) | 随机生成器的最大值,适用于数字类型。 |
fields.#.length | 可选 | 100 | Integer | 随机生成器生成字符的长度,适用于 char、varchar、string。 |
fields.#.start | 可选 | (none) | (Type of field) | 序列生成器的起始值。 |
fields.#.end | 可选 | (none) | (Type of field) | 序列生成器的结束值。 |
实战
基于docker-compose。
1、编辑 docker-compose.yml
version: "3"services: jobmanager: image: flink:1.14.4-scala_2.11 ports: - "8081:8081" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager: image: flink:1.14.4-scala_2.11 depends_on: - jobmanager command: taskmanager scale: 1 environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 4
2、创建执行程序
package quick.table;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.*;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;public class TableExample { public static void main(String[] args) throws Exception { String sql="CREATE TABLE source_table (\n" + " user_id INT,\n" + " cost DOUBLE,\n" + " ts AS localtimestamp,\n" + " WATERMARK FOR ts AS ts\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='5',\n" + "\n" + " 'fields.user_id.kind'='random',\n" + " 'fields.user_id.min'='1',\n" + " 'fields.user_id.max'='10',\n" + "\n" + " 'fields.cost.kind'='random',\n" + " 'fields.cost.min'='1',\n" + " 'fields.cost.max'='100'\n" + ")\n"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql(sql); // 执行查询 Table table = tableEnv.sqlQuery("select * from source_table"); DataStream resultStream = tableEnv.toDataStream(table); // add a printing sink and execute in DataStream API resultStream.print(); env.execute(); }}
3、启动服务
$ docker-compose up -d
4、打印结果
然后,将打包应用程序提交,Flink 的Web UI来提交作业监控集群的状态和正在运行的作业。
$ docker-compose logs -f taskmanagertaskmanager_1 | +I[4, 60.06509260823151, 2022-04-13T11:01:30.349]taskmanager_1 | +I[9, 22.444427031038334, 2022-04-13T11:01:30.349]......
源码已提交