【Flink1.14实战】Docker环境 DataStream jdbc
mysql source
从 MySQL 中读取数据的 Source。
首先 pom.xml 中添加 MySQL 依赖:
mysqlmysql-connector-java8.0.19
数据库表
DROP TABLE IF EXISTS `book`;CREATE TABLE book (id INT NOT NULL,title varchar(30),author varchar(30),price INT,PRIMARY KEY (id));INSERT INTO `book` VALUES ('1', 'zhangsan', '123456', '18'), ('2', 'lishi', '123', '17'), ('3', 'wangwu', '1234', '18'), ('4', 'jam', '12345', '16');COMMIT;
新建实体类:book.java
package quick.jdbc;public class Book { private int id; private String title; private String author; private int price; public Book(int id, String title, String author, int price) { this.id = id; this.title = title; this.author = author; this.price = price; } @Override public String toString() { return "Book{" + "id=" + id + ", title='" + title + '\'' + ", author='" + author + '\'' + ", price=" + price + '}'; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public String getAuthor() { return author; } public void setAuthor(String author) { this.author = author; } public int getPrice() { return price; } public void setPrice(int price) { this.price = price; }}
新建 Source 类 SourceFromMySQL.java,该类继承 RichSourceFunction ,实现里面的 open、close、run、cancel 方法:
package quick.jdbc;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.ResultSet;public class SourceFromMySQL extends RichSourceFunction { PreparedStatement ps; private Connection connection; / * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接。 * * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql = "select * from book;"; ps = this.connection.prepareStatement(sql); } / * 程序执行完毕就可以进行,关闭连接和释放资源的动作了 * * @throws Exception */ @Override public void close() throws Exception { super.close(); if (connection != null) { //关闭连接和释放资源 connection.close(); } if (ps != null) { ps.close(); } } / * DataStream 调用一次 run() 方法用来获取数据 * * @param ctx * @throws Exception */ @Override public void run(SourceContext ctx) throws Exception { ResultSet resultSet = ps.executeQuery(); while (resultSet.next()) { Book book = new Book( resultSet.getInt("id"), resultSet.getString("title").trim(), resultSet.getString("author").trim(), resultSet.getInt("price")); ctx.collect(book); } } @Override public void cancel() { } private static Connection getConnection() { Connection con = null; try { Class.forName("com.mysql.cj.jdbc.Driver"); con = DriverManager.getConnection("jdbc:mysql://mysql:3306/sql-demo?useUnicode=true&characterEncoding=UTF-8", "sql-demo", "demo-sql"); } catch (Exception e) { System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage()); } return con; }}
Flink 程序:
package quick.jdbc;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class JdbcSourceExample { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(new SourceFromMySQL()).print(); env.execute("jdbc source job"); }}
运行 Flink 程序,控制台日志中可以看见打印的 信息。
$ docker-compose logs -f taskmanagertaskmanager_1 | Book{id=0, title='title0', author='author0', price=50}taskmanager_1 | Book{id=1, title='title1', author='author1', price=51}taskmanager_1 | Book{id=2, title='title2', author='author2', price=52}