> 文档中心 > 【Flink1.14实战】Docker环境Flink Sql DataGen 快速开始

【Flink1.14实战】Docker环境Flink Sql DataGen 快速开始


DataGen SQL 连接器

DataGen 连接器允许按数据生成规则进行读取。

DataGen 连接器可以使用计算列语法。 这使您可以灵活地生成记录。

DataGen 连接器是内置的。

注意 不支持复杂类型: Array,Map,Row。 请用计算列构造这些类型。

创建一个 DataGen 的表

表的有界性:当表中字段的数据全部生成完成后,source 就结束了。 因此,表的有界性取决于字段的有界性。

每个列,都有两种生成数据的方法:

  • 随机生成器是默认的生成器,您可以指定随机生成的最大和最小值。char、varchar、string (类型)可以指定长度。它是无界的生成器。
  • 序列生成器,您可以指定序列的起始和结束值。它是有界的生成器,当序列数字达到结束值,读取结束。
CREATE TABLE datagen ( f_sequence INT, f_random INT, f_random_str STRING, ts AS localtimestamp, WATERMARK FOR ts AS ts) WITH ( 'connector' = 'datagen', -- optional options -- 'rows-per-second'='5', 'fields.f_sequence.kind'='sequence', 'fields.f_sequence.start'='1', 'fields.f_sequence.end'='1000', 'fields.f_random.min'='1', 'fields.f_random.max'='1000', 'fields.f_random_str.length'='10')
连接器参数
参数 是否必选 默认值 数据类型 描述
connector 必须 (none) String 指定要使用的连接器,这里是 ‘datagen’。
rows-per-second 可选 10000 Long 每秒生成的行数,用以控制数据发出速率。
fields.#.kind 可选 random String 指定 ‘#’ 字段的生成器。可以是 ‘sequence’ 或 ‘random’。
fields.#.min 可选 (Minimum value of type) (Type of field) 随机生成器的最小值,适用于数字类型。
fields.#.max 可选 (Maximum value of type) (Type of field) 随机生成器的最大值,适用于数字类型。
fields.#.length 可选 100 Integer 随机生成器生成字符的长度,适用于 char、varchar、string。
fields.#.start 可选 (none) (Type of field) 序列生成器的起始值。
fields.#.end 可选 (none) (Type of field) 序列生成器的结束值。
实战

基于docker-compose。

1、编辑 docker-compose.yml

version: "3"services:  jobmanager:    image: flink:1.14.4-scala_2.11    ports:      - "8081:8081"    command: jobmanager    environment:      - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager   taskmanager:    image: flink:1.14.4-scala_2.11    depends_on:      - jobmanager    command: taskmanager    scale: 1    environment:      - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 4   

2、创建执行程序

package quick.table;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.*;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;public class TableExample {    public static void main(String[] args) throws Exception { String sql="CREATE TABLE source_table (\n" +  "    user_id INT,\n" +  "    cost DOUBLE,\n" +  "    ts AS localtimestamp,\n" +  "    WATERMARK FOR ts AS ts\n" +  ") WITH (\n" +  "    'connector' = 'datagen',\n" +  "    'rows-per-second'='5',\n" +  "\n" +  "    'fields.user_id.kind'='random',\n" +  "    'fields.user_id.min'='1',\n" +  "    'fields.user_id.max'='10',\n" +  "\n" +  "    'fields.cost.kind'='random',\n" +  "    'fields.cost.min'='1',\n" +  "    'fields.cost.max'='100'\n" +  ")\n"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql(sql); // 执行查询 Table table = tableEnv.sqlQuery("select * from source_table"); DataStream resultStream = tableEnv.toDataStream(table); // add a printing sink and execute in DataStream API resultStream.print(); env.execute();    }}

3、启动服务

$ docker-compose up -d

4、打印结果
在这里插入图片描述

然后,将打包应用程序提交,Flink 的Web UI来提交作业监控集群的状态和正在运行的作业。

$ docker-compose logs -f taskmanagertaskmanager_1  | +I[4, 60.06509260823151, 2022-04-13T11:01:30.349]taskmanager_1  | +I[9, 22.444427031038334, 2022-04-13T11:01:30.349]......

在这里插入图片描述
源码已提交
【Flink1.14实战】Docker环境Flink Sql DataGen 快速开始

体育科学