搭建大型分布式服务(三十一)基于JOLT实现业务型轻量级ETL工具
系列文章目录
文章目录
- 系列文章目录
- 前言
-
-
- 一、本文要点
- 二、开发环境
- 三、Jolt 是什么?
- 四、JOLT实现数据转换
- 五、总体设计
- 六、小结
-
前言
随着需求演进迭代,经常需要把上游进来的一份数据,按照不同数据格式(字段打平、名称修改、类型转换、结构调整、数据过滤)转存或者分发到各应用。而在其中,我们耗费了不少的人力在json to json和sink to where的编码上。
一、本文要点
前面的文章,介绍了Docker搭建各种环境、SpringBoot整合各种中间件、如何把应用部署到云容器,本文开始一个新系列。系列文章完整目录
相信大部分业务都有这样的场景,消费一种MQ消息->转换为另外一种数据格式->转存到DB/REDIS/ES/COS或者推送到另外一个MQ,俗称数据搬运。但由于历史技术债务或者多方团队合作的等原因,项目对接的业务使用的数据格式存在很大差异,需要做不同程度的数据转换,例如:
也就是说,我们需要具备以下数据格式转换能力来适配各业务诉求。
- 字段名称修改
- 字段打平或展开
- 字段类型和值转换
- 字段结构转换
- 字段值过滤
很明显,如果硬编码去做这种数据搬运是乏味而低效的,我们需要一款工具去提效,它需要具备以下核心能力。
- 动态配置数据输入源
- 通过DSL描述JSON->JSON的转换逻辑(非硬编码,动态修改)
- 动态配置输出目标
通过调研发现,云产品SCF+CKAFKA实现的消息转储已经很接近我们的诉求,但它提供的功能仅支持原格式转储,其它格式则需要自行编码开发,跟我们的目标不符。输入和输出的动态配置能力相对比较简单,可以通过封装组件来实现,核心的要素还是在于数据格式的转换。经过一轮Google,发现JOLT比较符合我们组当前的业务场景。
- JSON to JSON
- JSON 结构转换
- Kafka 转存到 ES
- Kafka 转存到 DB
- DB 数据转KAFKA消息
- DB 数据转存ES
二、开发环境
- jdk 1.8
- maven 3.6.2
- springboot 2.4.3
- Jolt 0.1.5
- idea 2020
三、Jolt 是什么?
1、JOLT是什么?
JSON to JSON transformation library written in Java where the “specification” for the transform is itself a JSON document.
- Transforming JSON data from ElasticSearch, MongoDb, Cassandra, etc before sending it off to the world
- Extracting data from a large JSON documents for your own consumption
Jolt 实则上是一个类库,在当今大数据场景主流的ETL pipeline 工具如NIFI和 StreamSets都支持Jolt作为Json转换插件,还有我们熟悉的camel等,它支持我们使用特定的DSL来进行JSON to JSON转换。
2、JOLT支持哪些转换?
shift: copy data from the input tree and put it the output treedefault : apply default values to the treeremove : remove data from the treesort : sort the Map key values alphabetically ( for debugging and human readability )cardinality : "fix" the cardinality of input data. Eg, the "urls" element is usually a List, but if there is only one, then it is a String
Jolt支持复制、删除、修改、排序、对象和数组转换、设置默认值等转换能力,如果上述能力满足不了你的诉求,还可以自定义Java类全路径名称,实现Transform或ContextualTransform接口,定义转换逻辑。甚至可用SpecDriven接口来标记作为Jolt的新spec能力。
四、JOLT实现数据转换
前面提到,Jolt是一个工具类库,它的使用方式比较简单。只需要提前编写好DSL,几行代码就可以实现数据格式转换。
public static void main(String[] args) throws FileNotFoundException { // DSL转换描述文件和输入源json文件 File spec = ResourceUtils.getFile("classpath:spec.json"); File input = ResourceUtils.getFile("classpath:input.json"); // 构造转换器 List chainrSpecJSON = JsonUtils.streamToType(new FileInputStream(spec), new TypeReference<List>() {}); Chainr chainr = Chainr.fromSpec(chainrSpecJSON); // 构造输入源对象 Object inputJSON = JsonUtils.streamToType(new FileInputStream(input), new TypeReference<Map>() {}); // 转换输入源对象为目标格式对象 Object transformedOutput = chainr.transform(inputJSON); System.out.println(JsonUtils.toJsonString(transformedOutput));}
下面通过实战方式解决本文提到5个数据转换问题,截图左边栏为input.json,中间为DSL,右边为输出结果。
- 字段名称修改:修改id名称
- 字段打平或展开:将字段打平到salesInfo
- 字段值转换:Boolean转换为1或者2
- 字段结构转换:数组改为对象
- 字段值过滤:根据字段值过滤
除了支持以上转换方式,实际上Jolt还支持更多方式,如自定义代码,删除字段等,这里就不一一举例。
[ { "operation": "com.mmc.demo.jolt.CustomTransform", // 自定义代码 "spec": { "rating": { "primary": { "value": "Rating" }, "*": { "value": "SecondaryRatings.&1.Value", "$": "SecondaryRatings.&1.Id" } } } }]
五、总体设计
我们的目标是尽可能少编码甚至无需编码就能实现数据转存,开箱即用。但前文已经提到,Jolt侧重于结构的转换,所以我们需要做一些二次封装增强转换能力。同时,由于输出目的地种类较多,在设计上,通过插件化方式提供选择转存目的地。
参考资料
- GitHub:https://github.com/bazaarvoice/jolt
- Demo:http://jolt-demo.appspot.com/#mapToList
- Stack Overflow:https://stackoverflow.com/questions/1618038/xslt-equivalent-for-json
- Blog:https://cloud.tencent.com/developer/article/1690416
- Doc:Jolt PPT 介绍
六、小结
至此,本文介绍了ETL工具项目的背景和总体设计,编码过程和使用方式将在后续文章中慢慢介绍,有兴趣的朋友可以关注一波。下一篇《搭建大型分布式服务(三十二)基于JOLT的ETL工具 - Kafka消息转存到ES》
加我加群一起交流学习!更多干货下载、项目源码和大厂内推等着你
![]() |
![]() |
![]() |
《新程序员》:云原生和全面数字化实践
50位技术专家共同创作,文字、视频、音频交互阅读