【Flink1.14实战】Docker环境 Print SQL 连接器
Print SQL 连接器
Print 连接器允许将每一行写入标准输出流或者标准错误流。
设计目的:
- 简单的流作业测试。
- 对生产调试带来极大便利。
如何创建一张基于 Print 的表
CREATE TABLE print_table ( f0 INT, f1 INT, f2 STRING, f3 DOUBLE) WITH ( 'connector' = 'print')
或者,也可以通过 LIKE子句基于已有表的结构去创建新表。
CREATE TABLE print_table WITH ('connector' = 'print')LIKE source_table (EXCLUDING ALL)
实战
基于docker-compose。
1、编辑 docker-compose.yml
version: "3"services: jobmanager: image: flink:1.14.4-scala_2.11 ports: - "8081:8081" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager: image: flink:1.14.4-scala_2.11 depends_on: - jobmanager command: taskmanager scale: 1 environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 4
2、创建执行程序
package quick.table;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableEnvironment;public class PrintSqlExample { public static void main(String[] args) throws Exception { String sql="CREATE TABLE source_table (\n" + " user_id INT,\n" + " cost DOUBLE,\n" + " ts AS localtimestamp,\n" + " WATERMARK FOR ts AS ts\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='5',\n" + "\n" + " 'fields.user_id.kind'='random',\n" + " 'fields.user_id.min'='1',\n" + " 'fields.user_id.max'='10',\n" + "\n" + " 'fields.cost.kind'='random',\n" + " 'fields.cost.min'='1',\n" + " 'fields.cost.max'='100'\n" + ")\n"; String sinkSql="CREATE TABLE print_table (\n" + " user_id INT,\n" + " cost DOUBLE\n" + ") WITH (\n" + " 'connector' = 'print'\n" + ")"; EnvironmentSettings settings = EnvironmentSettings.newInstance().build(); TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.executeSql(sql); tEnv.executeSql(sinkSql); // 执行查询 Table table = tEnv.sqlQuery("select user_id,cost from source_table"); table.executeInsert("print_table"); }}
3、启动服务
$ docker-compose up -d
4、打印结果
然后,将打包应用程序提交,Flink 的Web UI来提交作业监控集群的状态和正在运行的作业。
$ docker-compose logs -f taskmanager