> 文档中心 > 【Flink1.14实战】Docker环境 Print SQL 连接器

【Flink1.14实战】Docker环境 Print SQL 连接器


Print SQL 连接器

Print 连接器允许将每一行写入标准输出流或者标准错误流。

设计目的:

  • 简单的流作业测试。
  • 对生产调试带来极大便利。
如何创建一张基于 Print 的表
CREATE TABLE print_table ( f0 INT, f1 INT, f2 STRING, f3 DOUBLE) WITH ( 'connector' = 'print')

或者,也可以通过 LIKE子句基于已有表的结构去创建新表。

CREATE TABLE print_table WITH ('connector' = 'print')LIKE source_table (EXCLUDING ALL)
实战

基于docker-compose。

1、编辑 docker-compose.yml

version: "3"services:  jobmanager:    image: flink:1.14.4-scala_2.11    ports:      - "8081:8081"    command: jobmanager    environment:      - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager   taskmanager:    image: flink:1.14.4-scala_2.11    depends_on:      - jobmanager    command: taskmanager    scale: 1    environment:      - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 4   

2、创建执行程序

package quick.table;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableEnvironment;public class PrintSqlExample {    public static void main(String[] args) throws Exception { String sql="CREATE TABLE source_table (\n" +  "    user_id INT,\n" +  "    cost DOUBLE,\n" +  "    ts AS localtimestamp,\n" +  "    WATERMARK FOR ts AS ts\n" +  ") WITH (\n" +  "    'connector' = 'datagen',\n" +  "    'rows-per-second'='5',\n" +  "\n" +  "    'fields.user_id.kind'='random',\n" +  "    'fields.user_id.min'='1',\n" +  "    'fields.user_id.max'='10',\n" +  "\n" +  "    'fields.cost.kind'='random',\n" +  "    'fields.cost.min'='1',\n" +  "    'fields.cost.max'='100'\n" +  ")\n"; String sinkSql="CREATE TABLE print_table (\n" +  " user_id INT,\n" +  " cost DOUBLE\n" +  ") WITH (\n" +  " 'connector' = 'print'\n" +  ")"; EnvironmentSettings settings = EnvironmentSettings.newInstance().build(); TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.executeSql(sql); tEnv.executeSql(sinkSql); // 执行查询 Table table = tEnv.sqlQuery("select user_id,cost from source_table"); table.executeInsert("print_table");    }}

3、启动服务

$ docker-compose up -d

4、打印结果

然后,将打包应用程序提交,Flink 的Web UI来提交作业监控集群的状态和正在运行的作业。
在这里插入图片描述

$ docker-compose logs -f taskmanager

在这里插入图片描述

【Flink1.14实战】Docker环境 Print SQL 连接器

解梦吧