【Flink1.14实战】Docker环境 DataStream jdbc sink
JDBC Connector
jdbc sink
该连接器可以向 JDBC 数据库写入数据。
添加下面的依赖以便使用该连接器(同时添加 JDBC 驱动):
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.11</artifactId> <version>1.14.4</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.19</version></dependency>
已创建的 JDBC Sink 能够保证至少一次的语义。 更有效的精确执行一次可以通过 upsert 语句或幂等更新实现。
用法示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env .fromElements(...) .addSink(JdbcSink.sink( "insert into books (id, title, author, price, qty) values (?,?,?,?,?)", (ps, t) -> { ps.setInt(1, t.id); ps.setString(2, t.title); ps.setString(3, t.author); ps.setDouble(4, t.price); ps.setInt(5, t.qty); }, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl(getDbMetadata().getUrl()) .withDriverName(getDbMetadata().getDriverClass()) .build()));env.execute();
实战
1、编辑 docker-compose.yml
version: "2.1"services: jobmanager: image: flink:1.14.4-scala_2.11 ports: - "8081:8081" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager: image: flink:1.14.4-scala_2.11 depends_on: - jobmanager command: taskmanager scale: 1 environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 4 mysql: image: mysql:5.7 ports: - "3306:3306" volumes: - ./data:/var/lib/mysql - ./mysql-init:/docker-entrypoint-initdb.d command: [ 'mysqld', '--innodb-buffer-pool-size=80M', '--character-set-server=utf8mb4', '--collation-server=utf8mb4_unicode_ci', '--default-time-zone=+8:00', '--lower-case-table-names=1', '--skip-name-resolve' ] environment: MYSQL_USER: "sql-demo" MYSQL_PASSWORD: "demo-sql" MYSQL_DATABASE: "sql-demo" MYSQL_RANDOM_ROOT_PASSWORD: "yes"
创建文件夹mysql-init, create-table.sql
CREATE TABLE book (id INT NOT NULL,title varchar(30),author varchar(30),price INT,PRIMARY KEY (id));
2、启动服务
$ docker-compose up -d
3、案例代码
book
package quick.jdbc;public class Book { private int id; private String title; private String author; private int price; public Book(int id, String title, String author, int price) { this.id = id; this.title = title; this.author = author; this.price = price; } @Override public String toString() { return "Book{" + "id=" + id + ", title='" + title + '\'' + ", author='" + author + '\'' + ", price=" + price + '}'; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public String getAuthor() { return author; } public void setAuthor(String author) { this.author = author; } public int getPrice() { return price; } public void setPrice(int price) { this.price = price; }}
job
package quick.jdbc;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.connector.jdbc.JdbcConnectionOptions;import org.apache.flink.connector.jdbc.JdbcSink;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.util.ArrayList;import java.util.List;public class JdbcSinkExample { public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); String url = params.get("url","jdbc:mysql://mysql:3306/sql-demo?autoReconnect=true&useSSL=true"); String driver = params.get("driver","com.mysql.cj.jdbc.Driver"); String username = params.get("username","sql-demo"); String password = params.get("password","demo-sql"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); List list=new ArrayList(); for(int i=0;i<10;i++){ Book b=new Book(i,"title"+i,"author"+i,i+50); list.add(b); } DataStream dataStream= env .fromElements(list) .flatMap(new FlatMapFunction< List, Book>() { @Override public void flatMap(List value, Collector out)throws Exception { for(Book book: value){out.collect(book); } } }); dataStream.addSink(JdbcSink.sink( "insert into book (id, title, author, price) values (?,?,?,?)", (ps, t) -> { ps.setInt(1, t.getId()); ps.setString(2, t.getTitle()); ps.setString(3, t.getAuthor()); ps.setInt(4, t.getPrice()); }, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl(url) .withDriverName(driver) .withUsername(username) .withPassword(password) .build())); env.execute("jdbcsink job"); }}
4、运行
然后,将打包应用程序提交,Flink 的Web UI来提交作业监控集群的状态和正在运行的作业。
从文件夹中启动 docker-compose 脚本。
$ docker-compose up -d
您可以通过Flink 控制台查看有关正在运行的作业的信息。
从 MySQL 内部探索结果。
$ docker-compose exec mysql mysql -Dsql-demo -usql-demo -pdemo-sqlmysql> use sql-demo;Database changedmysql> select count(*) from book;+----------+| count(*) |+----------+| 10 |+----------+