《我今年写过的最高效的大数据开发程序 --FlinkSinkToHbaseES》项目实战_大数据开发实战项目
这是一个真实的大数据业务案例,集群:华为大数据 (能说这个不)
数据写入效率:约300万条 /2mins
Flink流式任务 已上线某行内运行:当前正常
接下来说一下配置过程和业务需求,及程序亮点
业务场景:
大数据类:sink Elasticsearch 、sink HBase
业务组件:Flink、Hive、HDFS、HBase、ElasticSearch
场景:将Hive数据写出到HBase +ElasticSearch,业务侧使用ElasticSearch检索数据,真实数据是存在了HBase中,那么业务侧查询的数据其实就是HBase中的数据,ElasticSearch在这个过程中发挥的作用是倒排索引,构建HBase的二级索引,最终秒级查询HBase中的数据,对接BI展示报表
业务构图:
hive卸数文件 --> hbase数据 --> ES索引(包含检索字段) 如图:
对比常规业务亮点:
-
常规此类业务程序是串行的:通过MapReducer通过bulkLoad程序写入数据到HBase,通过Scan hbase表生成json文件 再通过bulk写入ES
-
我的程序是并行:可支持同时Sink两端数据写出,1.sink Elasticsearch 2.sink HBase,两侧同时入数,并发写出,而非串行
-
计算引擎对比:常规是MapReducer类型,我的是Flink流式计算
-
改造:多次测试 写入ES效率不是很高,后续这里我做了改进
-
技术框架更为新颖
数据写入效率提升:
- 写入效率取决于数据总条数/每条数据大小
- 当前我在公司测试 出的效率:约300万条 /2mins
配置过程:
运行任务示例:
1.per-job流式运行:flink run -t yarn-per-job -p 10 -ytm 2048 -ys 1 -yt ssl/ -Dyarn.ship-files=\"/opt/client/Flink/flink/conf\" --class com.mrs.bigdata.flink.sink.SinkToHbaseES /opt/client/Flink/flink/FlinkSink.jar --sourceParallelism 10 --tableName \'default:no_epd_bnk_fin_prd\' --confDir /opt/client/HBase/hbase/conf/ --hdfsFilePath /tmp/2022/ --hbasePropFile /tmp/2022/hbaseConfDir/no_epd_bnk_fin_prd.properties --esConfFile /tmp/2022/esConfDir/no_epd_bnk_fin_prd_mapping.properties2.Flink客户端与YARN分离模式运行: flink run -d xx 批量任务的话,就采用这种运行方式
配置文件(都为hdfs文件,需上传):
配置文件(都为hdfs文件,需上传):
-
Hbase:
/opt/client/Flink/flink/conf/hbaseConfDir/no_epd_bnk_fin_prd.properies
hbase_info=HBASE_ROW_KEY,c#cloumn1,c#cloumn2,c#cloumn3,c#cloumn4,c#cloumn5,c#cloumn6,c#cloumn7,c#cloumn8,c#cloumn9,c#cloumn10,c#cloumn11,c#cloumn12
-
ES:
-
/opt/client/Flink/flink/conf/esConfDir/no_epd_bnk_fin_prd_mapping.properties
#认证相关配置:#对应自己创建的机机用户名Principal = userName#manager上下载对应机机用户名的krb5.conf 和user.keytab ,需要放在linux环境Flink客户端下的conf/目录Krb5.conf = /opt/client/Flink/flink/conf/krb5.confUser.keytab = /opt/client/Flink/flink/conf/user.keytab#ES配置:#配置自己环境ES服务的 hostES.esServerHost = ip1:24100,ip3:24100,ip2:24100#配置最大重试超时时间ES.MaxRetryTimeoutMillis = 300000#配置客户端和服务器建立连接的超时时间ES.ConnectTimeout = 5000#配置客户端从服务器读取数据的超时时间ES.SocketTimeout = 60000#是否是安全模式ES.isSecureMode = true#自定义的ES索引的type,要与配置文件mapping.json中的对应ES.type = doc#建立ES索引依赖的json文件路径ES.mapping.json = conf/esConfDir/no_epd_bnk_fin_prd_mapping.json#ES的索引keys 数字代表key在hdfs columns中的顺序,顺序严格一致ES.mapping = HBASE_ROW_KEY,1#client_no,2#branch_no,3#fin_nameES.indexName = hcqs.xx_fin_prd
2./opt/client/Flink/flink/conf/esConfDir/no_epd_bnk_fin_prd_mapping.json
示例:
{ \"settings\": { \"number_of_shards\": 3, \"number_of_replicas\": 1 }, \"mappings\": { \"properties\": { \"client_no\": { \"type\": \"keyword\" }, \"branch_no\": { \"type\": \"keyword\" }, \"fin_name\": { \"type\": \"keyword\" } } }}
20241229调整:
-
任务类型选择 :增加sinkPoint chooses: 1/Hbase,2/HbaseAndEs,3/Es
-
source端并发度: 等于文件个数
-
source端并发度小于sink端,则部分并发子任务存在空闲
-
source端数据拉取:采用buffer,比如1000条拉取1次
-
排除项:hdfs空文件
-
es bulk批量:批量操作
-
速度:100万条/分钟 (sink端并发度:15)
-
删除:sink结束 删除源文件
20250126更新:
-
source端无文件情况,可正常结束
-
涉及两集群 存在FS配置文件选择项目配置
YARN资源池: 1集群
配置文件上传:全放置在1集群HDFS
参数调整confDir:包含HDFS、HBase配置文件 将此分开
原始:
--confDir /opt/client/HBase/hbase/conf/
==>
当前:
hdfsConfDir :1集群hdfs配置文件
hbaseConfDir:2集群hbase配置文件
--hdfsConfDir /opt/client/HDFS/hadoop/etc/hadoop/ --hbaseConfDir /opt/client/HBase/hbase/conf/
参数列表:
chooses 1/Hbase,2/HbaseAndEs,3/Es
hbase tableName
hdfs conf dir
hbase conf dir
path hdfs file
path hbasePropFile
path esConfFile
最后
谢谢大家 到此结束
@500佰