> 文档中心 > Flink源码分析: Flink JDBC Upsert模式实现原理

Flink源码分析: Flink JDBC Upsert模式实现原理

Flink源码专辑:
Flink源码分析(一): 重启策略机制RestartStrategy
Flink源码分析(二): 广播状态流实现规格更新或字段参数变更

前言:
版权说明:本专栏是作者在日常工作期间对技术的不断深入研究后的沉淀,辛苦码字总结而成,如有问题,欢迎指正.

版本号:Flink1.10.0

在使用Flink SQL模式时,可以选择参数为’update-mode’=‘Upsert’,今天我们就跟着这个参数来看看flink jdbc connector这个功能是如何实现的

一. JDBC的几个Utils

  • JDBCValidator

我们在使用flink sql创建流表时,建表语句如下:

CREATE TABLE pvuv_sink (    dt VARCHAR,    pv BIGINT,    uv BIGINT) WITH (    'connector.type' = 'jdbc', -- 使用 jdbc connector    'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url    'connector.table' = 'pvuv_sink', -- 表名    'connector.username' = 'root', -- 用户名    'connector.password' = '123456', -- 密码    'connector.write.flush.max-rows' = '1' -- 默认5000条,为了演示改为1条)

跟踪源码org.apache.flink.table.descriptors.JDBCValidator可以找到connector可选配置参数,参数主要分为:CommonProperties,LookupProperties,ReadProperties,SinkProperties.

  • JDBCTypeUtil
    这里定义的各种数据库的类型转换,通过静态代码块typeInformationToSqlType实现转换,其他类中会调用该方法JDBCTypeUtil::typeInformationToSqlType.

  • JDBCUtils
    这个方法里封装是jdbc connector所用到的方法,其中包括

setRecordToStatement 写入数据到statement
setField 根据数据类型实现pstmt.set
getFieldFromResultSet 通过JDBC查询返回ResultSet,根据数据类型获取值

二. JDBC的Upsert功能
org.apache.flink.api.java.io.jdbc.writer.UpsertWriter

  1. 该抽象类继承了JDBCWriter接口
  2. 有参构造函数public static UpsertWriter create( JDBCDialect dialect, String tableName, String[] fieldNames, int[] fieldTypes, String[] keyFields)
    其中JDBCDialect实现了不同JDBC Driver的insert,delete,update,upsert,select的动态拼接SQL