> 文档中心 > 数据湖(六):Hudi与Flink整合

数据湖(六):Hudi与Flink整合

文章目录

Hudi与Flink整合

一、maven pom.xml导入如下包

二、Flink 写入数据到Hudi代码


Hudi与Flink整合

Hudi0.8.0版本与Flink1.12.x之上版本兼容,目前经过测试,Hudi0.8.0版本开始支持Flink,通过Flink写数据到Hudi时,必须开启checkpoint,至少有5次checkpoint后才能看到对应hudi中的数据。

但是应该是有一些问题,目前问题如下:

  • 在本地执行Flink代码向Flink写数据时,存在“java.lang.AbstractMethodError: Method org/apache/hudi/sink/StreamWriteOperatorCoordinator.notifyCheckpointComplete(J)V is abstract”错误信息,预计是hudi版本支持问题。
  • 写入到Flink中的数据,如果使用Flink读取出来,会有对应的错误:“Exception in thread "main" org.apache.hudi.exception.HoodieException: Get table avro schema error”,这个错误主要是由于上一个错误导致Hudi中没有commit信息,在内部读取时,读取不到Commit信息导致。

一、maven pom.xml导入如下包

    UTF-8    1.8    1.8    1.12.1         org.apache.hudi hudi-flink-bundle_2.11 0.8.0         org.apache.flink flink-clients_2.11 ${flink.version}             org.apache.flink flink-java ${flink.version}         org.apache.flink flink-streaming-java_2.11 ${flink.version}             org.apache.flink flink-scala_2.11 ${flink.version}         org.apache.flink flink-streaming-scala_2.11 ${flink.version}                org.apache.hadoop    hadoop-client    2.9.2             org.apache.flink flink-statebackend-rocksdb_2.11 ${flink.version}             org.apache.flink flink-connector-kafka_2.11 ${flink.version}         org.apache.flink flink-csv 1.12.1             org.apache.flink flink-table-planner_2.11 ${flink.version}         org.apache.flink flink-table-api-scala-bridge_2.11 ${flink.version}             org.apache.flink flink-table-planner-blink_2.11 ${flink.version}    

二、Flink 写入数据到Hudi代码

//1.创建对象    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,EnvironmentSettings.newInstance()    .useBlinkPlanner().inStreamingMode().build())    import org.apache.flink.streaming.api.scala._    //2.必须开启checkpoint 默认有5个checkpoint后,hudi目录下才会有数据,不然只有一个.hoodie目录。    env.enableCheckpointing(2000)//    env.setStateBackend(new RocksDBStateBackend("hdfs://mycluster/flinkstate"))    //3.设置并行度    env.setParallelism(1)    //4.读取Kakfa 中的数据    tableEnv.executeSql(      """ | create table kafkaInputTable( |  id varchar, |  name varchar, |  age int, |  ts varchar, |  loc varchar | ) with ( |  'connector' = 'kafka', |  'topic' = 'test_tp', |  'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092', |  'scan.startup.mode'='latest-offset', |  'properties.group.id' = 'testgroup', |  'format' = 'csv' | )      """.stripMargin)    val table: Table = tableEnv.from("kafkaInputTable")    //5.创建Flink 对应的hudi表    tableEnv.executeSql(      """ |CREATE TABLE t1( |  id VARCHAR(20) PRIMARY KEY NOT ENFORCED,--默认主键列为uuid,这里可以后面跟上“PRIMARY KEY NOT ENFORCED”指定为主键列 |  name VARCHAR(10), |  age INT, |  ts VARCHAR(20), |  loc VARCHAR(20) |) |PARTITIONED BY (loc) |WITH ( |  'connector' = 'hudi', |  'path' = '/flink_hudi_data', |  'write.tasks' = '1', -- default is 4 ,required more resource |  'compaction.tasks' = '1', -- default is 10 ,required more resource |  'table.type' = 'COPY_ON_WRITE' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE |)      """.stripMargin)    //6.向表中插入数据    tableEnv.executeSql(      s"""  | insert into t1 select id,name,age,ts,loc from ${table}      """.stripMargin)    env.execute()

以上代码需要注意“PRIMARY KEY NOT ENFORCED”可以不指定,如果不指定hudi对应的主键列默认是“uuid”,指定后可以使用自定义的列名当做主键。


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨