> 文档中心 > 【Flink1.14实战】Docker环境 DataStream jdbc

【Flink1.14实战】Docker环境 DataStream jdbc


mysql source

从 MySQL 中读取数据的 Source。

首先 pom.xml 中添加 MySQL 依赖

mysqlmysql-connector-java8.0.19

数据库表

DROP TABLE IF EXISTS `book`;CREATE TABLE book (id INT NOT NULL,title varchar(30),author varchar(30),price   INT,PRIMARY KEY (id));INSERT INTO `book` VALUES ('1', 'zhangsan', '123456', '18'), ('2', 'lishi', '123', '17'), ('3', 'wangwu', '1234', '18'), ('4', 'jam', '12345', '16');COMMIT;

新建实体类:book.java

package quick.jdbc;public class Book {    private  int id;    private String title;    private String author;    private int price;    public Book(int id, String title, String author, int price) { this.id = id; this.title = title; this.author = author; this.price = price;    }    @Override    public String toString() { return "Book{" +  "id=" + id +  ", title='" + title + '\'' +  ", author='" + author + '\'' +  ", price=" + price +  '}';    }    public int getId() { return id;    }    public void setId(int id) { this.id = id;    }    public String getTitle() { return title;    }    public void setTitle(String title) { this.title = title;    }    public String getAuthor() { return author;    }    public void setAuthor(String author) { this.author = author;    }    public int getPrice() { return price;    }    public void setPrice(int price) { this.price = price;    }}

新建 Source 类 SourceFromMySQL.java,该类继承 RichSourceFunction ,实现里面的 open、close、run、cancel 方法

package quick.jdbc;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.ResultSet;public class SourceFromMySQL extends RichSourceFunction {    PreparedStatement ps;    private Connection connection;    /     * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接。     *     * @param parameters     * @throws Exception     */    @Override    public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql = "select * from book;"; ps = this.connection.prepareStatement(sql);    }    /     * 程序执行完毕就可以进行,关闭连接和释放资源的动作了     *     * @throws Exception     */    @Override    public void close() throws Exception { super.close(); if (connection != null) { //关闭连接和释放资源     connection.close(); } if (ps != null) {     ps.close(); }    }    /     * DataStream 调用一次 run() 方法用来获取数据     *     * @param ctx     * @throws Exception     */    @Override    public void run(SourceContext ctx) throws Exception { ResultSet resultSet = ps.executeQuery(); while (resultSet.next()) {     Book book = new Book(      resultSet.getInt("id"),      resultSet.getString("title").trim(),      resultSet.getString("author").trim(),      resultSet.getInt("price"));     ctx.collect(book); }    }    @Override    public void cancel() {    }    private static Connection getConnection() { Connection con = null; try {     Class.forName("com.mysql.cj.jdbc.Driver");     con = DriverManager.getConnection("jdbc:mysql://mysql:3306/sql-demo?useUnicode=true&characterEncoding=UTF-8", "sql-demo", "demo-sql"); } catch (Exception e) {     System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage()); } return con;    }}

Flink 程序:

package quick.jdbc;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class JdbcSourceExample {    public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(new SourceFromMySQL()).print(); env.execute("jdbc source job");    }}

运行 Flink 程序,控制台日志中可以看见打印的 信息。

$ docker-compose logs -f taskmanagertaskmanager_1  | Book{id=0, title='title0', author='author0', price=50}taskmanager_1  | Book{id=1, title='title1', author='author1', price=51}taskmanager_1  | Book{id=2, title='title2', author='author2', price=52}