> 技术文档 > 《我今年写过的最高效的大数据开发程序 --FlinkSinkToHbaseES》项目实战_大数据开发实战项目

《我今年写过的最高效的大数据开发程序 --FlinkSinkToHbaseES》项目实战_大数据开发实战项目

#闲聊 #个人大数据真实业务场景分享 #鱼友

这是一个真实的大数据业务案例,集群:华为大数据 (能说这个不)

数据写入效率:约300万条 /2mins

Flink流式任务 已上线某行内运行:当前正常

接下来说一下配置过程和业务需求,及程序亮点

业务场景:

大数据类:sink Elasticsearch 、sink HBase

业务组件Flink、Hive、HDFS、HBase、ElasticSearch

场景将Hive数据写出到HBase +ElasticSearch,业务侧使用ElasticSearch检索数据,真实数据是存在了HBase中,那么业务侧查询的数据其实就是HBase中的数据,ElasticSearch在这个过程中发挥的作用是倒排索引,构建HBase的二级索引,最终秒级查询HBase中的数据,对接BI展示报表

业务构图:

hive卸数文件 --> hbase数据 --> ES索引(包含检索字段) 如图:

对比常规业务亮点:

  1. 常规此类业务程序是串行的:通过MapReducer通过bulkLoad程序写入数据到HBase,通过Scan hbase表生成json文件 再通过bulk写入ES

  2. 我的程序是并行:可支持同时Sink两端数据写出,1.sink Elasticsearch 2.sink HBase,两侧同时入数,并发写出,而非串行

  3. 计算引擎对比:常规是MapReducer类型,我的是Flink流式计算

  4. 改造:多次测试 写入ES效率不是很高,后续这里我做了改进    

  5. 技术框架更为新颖

数据写入效率提升:

  1. 写入效率取决于数据总条数/每条数据大小
  2. 当前我在公司测试 出的效率:约300万条 /2mins

配置过程

运行任务示例:

1.per-job流式运行:flink run -t yarn-per-job -p 10 -ytm 2048 -ys 1 -yt ssl/ -Dyarn.ship-files=\"/opt/client/Flink/flink/conf\" --class com.mrs.bigdata.flink.sink.SinkToHbaseES /opt/client/Flink/flink/FlinkSink.jar --sourceParallelism 10 --tableName \'default:no_epd_bnk_fin_prd\' --confDir /opt/client/HBase/hbase/conf/ --hdfsFilePath /tmp/2022/ --hbasePropFile /tmp/2022/hbaseConfDir/no_epd_bnk_fin_prd.properties --esConfFile /tmp/2022/esConfDir/no_epd_bnk_fin_prd_mapping.properties2.Flink客户端与YARN分离模式运行: flink run -d xx 批量任务的话,就采用这种运行方式

配置文件(都为hdfs文件,需上传):

配置文件(都为hdfs文件,需上传):

  • Hbase:

/opt/client/Flink/flink/conf/hbaseConfDir/no_epd_bnk_fin_prd.properies

hbase_info=HBASE_ROW_KEY,c#cloumn1,c#cloumn2,c#cloumn3,c#cloumn4,c#cloumn5,c#cloumn6,c#cloumn7,c#cloumn8,c#cloumn9,c#cloumn10,c#cloumn11,c#cloumn12
  • ES:

  1. /opt/client/Flink/flink/conf/esConfDir/no_epd_bnk_fin_prd_mapping.properties

#认证相关配置:#对应自己创建的机机用户名Principal = userName#manager上下载对应机机用户名的krb5.conf 和user.keytab ,需要放在linux环境Flink客户端下的conf/目录Krb5.conf = /opt/client/Flink/flink/conf/krb5.confUser.keytab = /opt/client/Flink/flink/conf/user.keytab#ES配置:#配置自己环境ES服务的 hostES.esServerHost = ip1:24100,ip3:24100,ip2:24100#配置最大重试超时时间ES.MaxRetryTimeoutMillis = 300000#配置客户端和服务器建立连接的超时时间ES.ConnectTimeout = 5000#配置客户端从服务器读取数据的超时时间ES.SocketTimeout = 60000#是否是安全模式ES.isSecureMode = true#自定义的ES索引的type,要与配置文件mapping.json中的对应ES.type = doc#建立ES索引依赖的json文件路径ES.mapping.json = conf/esConfDir/no_epd_bnk_fin_prd_mapping.json#ES的索引keys 数字代表key在hdfs columns中的顺序,顺序严格一致ES.mapping = HBASE_ROW_KEY,1#client_no,2#branch_no,3#fin_nameES.indexName = hcqs.xx_fin_prd

2./opt/client/Flink/flink/conf/esConfDir/no_epd_bnk_fin_prd_mapping.json

示例:

{ \"settings\": { \"number_of_shards\": 3, \"number_of_replicas\": 1 }, \"mappings\": { \"properties\": { \"client_no\": { \"type\": \"keyword\" }, \"branch_no\": { \"type\": \"keyword\" }, \"fin_name\": { \"type\": \"keyword\" } } }}

20241229调整:

  1. 任务类型选择 :增加sinkPoint chooses: 1/Hbase,2/HbaseAndEs,3/Es

  2. source端并发度: 等于文件个数

  3. source端并发度小于sink端,则部分并发子任务存在空闲

  4. source端数据拉取:采用buffer,比如1000条拉取1次

  5. 排除项:hdfs空文件

  6. es bulk批量:批量操作

  7. 速度:100万条/分钟 (sink端并发度:15)

  8. 删除:sink结束 删除源文件

20250126更新:

  1. source端无文件情况,可正常结束

  2. 涉及两集群 存在FS配置文件选择项目配置

    YARN资源池: 1集群

配置文件上传:全放置在1集群HDFS

参数调整confDir:包含HDFS、HBase配置文件 将此分开

        原始:

   --confDir /opt/client/HBase/hbase/conf/

         ==>

        当前:

         hdfsConfDir :1集群hdfs配置文件

         hbaseConfDir:2集群hbase配置文件

   --hdfsConfDir /opt/client/HDFS/hadoop/etc/hadoop/ --hbaseConfDir /opt/client/HBase/hbase/conf/

参数列表:

  chooses 1/Hbase,2/HbaseAndEs,3/Es

  hbase tableName

  hdfs conf dir

  hbase conf dir

  path hdfs file

  path hbasePropFile

  path esConfFile

最后

谢谢大家 到此结束

@500佰