> 文档中心 > 【Flink1.14实战】Docker环境 DataStream jdbc sink

【Flink1.14实战】Docker环境 DataStream jdbc sink


JDBC Connector

jdbc sink

连接器可以向 JDBC 数据库写入数据。

添加下面的依赖以便使用该连接器(同时添加 JDBC 驱动):

<dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-connector-jdbc_2.11</artifactId>    <version>1.14.4</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.19</version></dependency>

已创建的 JDBC Sink 能够保证至少一次的语义。 更有效的精确执行一次可以通过 upsert 语句或幂等更新实现。

用法示例:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env .fromElements(...) .addSink(JdbcSink.sink(  "insert into books (id, title, author, price, qty) values (?,?,?,?,?)",  (ps, t) -> {      ps.setInt(1, t.id);      ps.setString(2, t.title);      ps.setString(3, t.author);      ps.setDouble(4, t.price);      ps.setInt(5, t.qty);  },  new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()   .withUrl(getDbMetadata().getUrl())   .withDriverName(getDbMetadata().getDriverClass())   .build()));env.execute();

实战

1、编辑 docker-compose.yml

version: "2.1"services:  jobmanager:    image: flink:1.14.4-scala_2.11    ports:      - "8081:8081"    command: jobmanager    environment:      - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager   taskmanager:    image: flink:1.14.4-scala_2.11    depends_on:      - jobmanager    command: taskmanager    scale: 1    environment:      - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 4     mysql:    image: mysql:5.7    ports:      - "3306:3306"    volumes:      - ./data:/var/lib/mysql      - ./mysql-init:/docker-entrypoint-initdb.d    command: [   'mysqld',   '--innodb-buffer-pool-size=80M',   '--character-set-server=utf8mb4',   '--collation-server=utf8mb4_unicode_ci',   '--default-time-zone=+8:00',   '--lower-case-table-names=1',   '--skip-name-resolve' ]    environment:      MYSQL_USER: "sql-demo"      MYSQL_PASSWORD: "demo-sql"      MYSQL_DATABASE: "sql-demo"      MYSQL_RANDOM_ROOT_PASSWORD: "yes"

创建文件夹mysql-init, create-table.sql

CREATE TABLE book (id INT NOT NULL,title varchar(30),author varchar(30),price   INT,PRIMARY KEY (id));

2、启动服务

$ docker-compose up -d

3、案例代码

book

package quick.jdbc;public class Book {    private  int id;    private String title;    private String author;    private int price;    public Book(int id, String title, String author, int price) { this.id = id; this.title = title; this.author = author; this.price = price;    }    @Override    public String toString() { return "Book{" +  "id=" + id +  ", title='" + title + '\'' +  ", author='" + author + '\'' +  ", price=" + price +  '}';    }    public int getId() { return id;    }    public void setId(int id) { this.id = id;    }    public String getTitle() { return title;    }    public void setTitle(String title) { this.title = title;    }    public String getAuthor() { return author;    }    public void setAuthor(String author) { this.author = author;    }    public int getPrice() { return price;    }    public void setPrice(int price) { this.price = price;    }}

job

package quick.jdbc;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.connector.jdbc.JdbcConnectionOptions;import org.apache.flink.connector.jdbc.JdbcSink;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.util.ArrayList;import java.util.List;public class JdbcSinkExample {    public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); String url = params.get("url","jdbc:mysql://mysql:3306/sql-demo?autoReconnect=true&useSSL=true"); String driver = params.get("driver","com.mysql.cj.jdbc.Driver"); String username = params.get("username","sql-demo"); String password = params.get("password","demo-sql"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); List list=new ArrayList(); for(int i=0;i<10;i++){     Book b=new Book(i,"title"+i,"author"+i,i+50);     list.add(b); } DataStream dataStream= env  .fromElements(list)  .flatMap(new FlatMapFunction< List, Book>() {      @Override      public void flatMap(List value, Collector out)throws Exception {   for(Book book: value){out.collect(book);   }      }  }); dataStream.addSink(JdbcSink.sink(  "insert into book (id, title, author, price) values (?,?,?,?)",  (ps, t) -> {      ps.setInt(1, t.getId());      ps.setString(2, t.getTitle());      ps.setString(3, t.getAuthor());      ps.setInt(4, t.getPrice());  },  new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()   .withUrl(url)   .withDriverName(driver)   .withUsername(username)   .withPassword(password)   .build())); env.execute("jdbcsink job");    }}

4、运行

然后,将打包应用程序提交,Flink 的Web UI来提交作业监控集群的状态和正在运行的作业。

从文件夹中启动 docker-compose 脚本。

$ docker-compose up -d

您可以通过Flink 控制台查看有关正在运行的作业的信息。

从 MySQL 内部探索结果。

$ docker-compose exec mysql mysql -Dsql-demo -usql-demo -pdemo-sqlmysql> use sql-demo;Database changedmysql> select count(*) from book;+----------+| count(*) |+----------+|      10 |+----------+

小故事网