FlinkCDC x Paimon进行数据同步 使用paimon-flink-action包 NoClassDefFoundError 依赖问题解决
Flink版本:1.19、1.20(2.0连官网demo都没跑起来,所以暂时放弃了,让子弹飞一会吧)
Paimon版本:1.11、1.01
FlinkCDC版本:3.1-3.4
以上版本存在这个bug且解决方式都适用
本文以Flink1.20、Paimon1.11、FlinkCDC3.4.0为例
背景:使用Paimon的paimon-flink-action jar包,进行Mysql->Paimon的单表/整库同步
官方文档:
官方文档:Mysql CDC | Apache Paimonhttps://paimon.apache.org/docs/1.1/cdc-ingestion/mysql-cdc/
场景复现:
通过官网的引导在flink/lib目录放入了
flink-connector-mysql-cdc-3.4.0.jar
mysql-connector-java-8.0.27.jar
在flink/opt目录放入paimon-flink-action-1.1.1.jar
mysql整库同步任务启动命令:
/opt/flink/bin/flink run \\
/opt/flink/opt/paimon-flink-action-1.1.1.jar \\
mysql_sync_database \\
--warehouse file:///opt/flink/paimon \\
--database xxx \\
--mysql_conf hostname=192.168.xxx.xx \\
--mysql_conf username=root \\
--mysql_conf password=xxx \\
--mysql_conf database-name=\'xxx\' \\
--catalog_conf metastore=filesystem \\
--table_conf bucket=4 \\
--table_conf changelog-producer=input \\
--table_conf sink.parallelism=2 \\
--table_conf schema.auto-evolve=true
报错:
java.lang.NoClassDefFoundError: org/apache/flink/cdc/debezium/DebeziumDeserializationSchema
查看paimon-flink-cdc源码,找到这个类所在的依赖包,会发现它是debezium相关的包,属于flink-connector-mysql-cdc-3.4.0.jar的子依赖,在lib目录放入相关jar包,会发现继续报其他的子依赖找不到。
便捷一点的做法就是自己重新打一个flink-connector-mysql-cdc-3.4.0.jar的胖包,来替代原来的包。但是这样又会出现重定位依赖冲突:
Caused by: java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: Lorg/apache/flink/cdc/connectors/shaded/org/apache/kafka/connect/json/JsonConverter;
……
Caused by: java.lang.NoClassDefFoundError: Lorg/apache/flink/cdc/connectors/shaded/org/apache/kafka/connect/json/JsonConverter;……
Caused by: java.lang.ClassNotFoundException: org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverter
终极解决方案:
第一步:打一个flink-connector-mysql-cdc-3.4.0.jar的胖包,并加入kafka/connect的重定位配置
- idea新建空项目flink-connector-mysql-cdc-fat-3.4.0
- pom文件加入以下配置:
org.apache.flink flink-connector-mysql-cdc 3.4.0 org.apache.maven.plugins maven-shade-plugin 3.3.0 package shade org.apache.kafka org.apache.flink.cdc.connectors.shaded.org.apache.kafka com.fasterxml.jackson org.apache.flink.cdc.connectors.shaded.com.fasterxml.jackson *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA META-INF/NOTICE META-INF/LICENSE META-INF/*.txt META-INF/DEPENDENCIES
- maven clean install之后在target目录得到胖包:flink-connector-mysql-cdc-fat-3.4.0-1.0-SNAPSHOT.jar
- 删除flink/lib目录下的flink-connector-mysql-cdc-3.4.0.jar
- 放入刚才的flink-connector-mysql-cdc-fat-3.4.0-1.0-SNAPSHOT.jar
第二步: 修改flink/conf/config.yaml
- 增加classloader配置
标准格式:
classloader:
resolve:
order: child-first
parent-first-patterns: |
org.apache.flink
org.apache.paimon
com.ververica
org.apache.hadoop
org.apache.log4j
org.apache.zookeeper
org.apache.kafka
javax.annotation
org.slf4j
第三步:重启flink集群
- flink/bin/stop-cluster.sh
- flink/bin/start-cluster.sh
再次运行任务启动命令,不再报错。