> 文档中心 > 搭建大型分布式服务(三十一)基于JOLT实现业务型轻量级ETL工具

搭建大型分布式服务(三十一)基于JOLT实现业务型轻量级ETL工具


系列文章目录


文章目录

  • 系列文章目录
  • 前言
      • 一、本文要点
      • 二、开发环境
      • 三、Jolt 是什么?
      • 四、JOLT实现数据转换
      • 五、总体设计
      • 六、小结

前言

随着需求演进迭代,经常需要把上游进来的一份数据,按照不同数据格式(字段打平、名称修改、类型转换、结构调整、数据过滤)转存或者分发到各应用。而在其中,我们耗费了不少的人力在json to json和sink to where的编码上。


一、本文要点

前面的文章,介绍了Docker搭建各种环境、SpringBoot整合各种中间件、如何把应用部署到云容器,本文开始一个新系列。系列文章完整目录

相信大部分业务都有这样的场景,消费一种MQ消息->转换为另外一种数据格式->转存到DB/REDIS/ES/COS或者推送到另外一个MQ,俗称数据搬运。但由于历史技术债务或者多方团队合作的等原因,项目对接的业务使用的数据格式存在很大差异,需要做不同程度的数据转换,例如:
在这里插入图片描述

也就是说,我们需要具备以下数据格式转换能力来适配各业务诉求。

  • 字段名称修改
  • 字段打平或展开
  • 字段类型和值转换
  • 字段结构转换
  • 字段值过滤

很明显,如果硬编码去做这种数据搬运是乏味而低效的,我们需要一款工具去提效,它需要具备以下核心能力。

  • 动态配置数据输入源
  • 通过DSL描述JSON->JSON的转换逻辑(非硬编码,动态修改)
  • 动态配置输出目标

通过调研发现,云产品SCF+CKAFKA实现的消息转储已经很接近我们的诉求,但它提供的功能仅支持原格式转储,其它格式则需要自行编码开发,跟我们的目标不符。输入和输出的动态配置能力相对比较简单,可以通过封装组件来实现,核心的要素还是在于数据格式的转换。经过一轮Google,发现JOLT比较符合我们组当前的业务场景。
在这里插入图片描述

  • JSON to JSON
  • JSON 结构转换
  • Kafka 转存到 ES
  • Kafka 转存到 DB
  • DB 数据转KAFKA消息
  • DB 数据转存ES

二、开发环境

  • jdk 1.8
  • maven 3.6.2
  • springboot 2.4.3
  • Jolt 0.1.5
  • idea 2020

三、Jolt 是什么?

1、JOLT是什么?

JSON to JSON transformation library written in Java where the “specification” for the transform is itself a JSON document.

    1. Transforming JSON data from ElasticSearch, MongoDb, Cassandra, etc before sending it off to the world
    1. Extracting data from a large JSON documents for your own consumption

Jolt 实则上是一个类库,在当今大数据场景主流的ETL pipeline 工具如NIFI和 StreamSets都支持Jolt作为Json转换插件,还有我们熟悉的camel等,它支持我们使用特定的DSL来进行JSON to JSON转换。

2、JOLT支持哪些转换?

shift: copy data from the input tree and put it the output treedefault     : apply default values to the treeremove      : remove data from the treesort : sort the Map key values alphabetically ( for debugging and human readability )cardinality : "fix" the cardinality of input data.  Eg, the "urls" element is usually a List, but if there is only one, then it is a String

Jolt支持复制、删除、修改、排序、对象和数组转换、设置默认值等转换能力,如果上述能力满足不了你的诉求,还可以自定义Java类全路径名称,实现Transform或ContextualTransform接口,定义转换逻辑。甚至可用SpecDriven接口来标记作为Jolt的新spec能力。

四、JOLT实现数据转换

前面提到,Jolt是一个工具类库,它的使用方式比较简单。只需要提前编写好DSL,几行代码就可以实现数据格式转换。

public static void main(String[] args) throws FileNotFoundException {    // DSL转换描述文件和输入源json文件    File spec = ResourceUtils.getFile("classpath:spec.json");    File input = ResourceUtils.getFile("classpath:input.json");    // 构造转换器    List chainrSpecJSON = JsonUtils.streamToType(new FileInputStream(spec), new TypeReference<List>() {});    Chainr chainr = Chainr.fromSpec(chainrSpecJSON);    // 构造输入源对象    Object inputJSON = JsonUtils.streamToType(new FileInputStream(input), new TypeReference<Map>() {});    // 转换输入源对象为目标格式对象    Object transformedOutput = chainr.transform(inputJSON);    System.out.println(JsonUtils.toJsonString(transformedOutput));}

下面通过实战方式解决本文提到5个数据转换问题,截图左边栏为input.json,中间为DSL,右边为输出结果。

  • 字段名称修改:修改id名称
    在这里插入图片描述
  • 字段打平或展开:将字段打平到salesInfo
    在这里插入图片描述
  • 字段值转换:Boolean转换为1或者2
    在这里插入图片描述
  • 字段结构转换:数组改为对象
    在这里插入图片描述
  • 字段值过滤:根据字段值过滤
    在这里插入图片描述
    除了支持以上转换方式,实际上Jolt还支持更多方式,如自定义代码,删除字段等,这里就不一一举例。
[  {    "operation": "com.mmc.demo.jolt.CustomTransform", // 自定义代码    "spec": {      "rating": { "primary": {   "value": "Rating" }, "*": {   "value": "SecondaryRatings.&1.Value",   "$": "SecondaryRatings.&1.Id" }      }    }  }]

五、总体设计

我们的目标是尽可能少编码甚至无需编码就能实现数据转存,开箱即用。但前文已经提到,Jolt侧重于结构的转换,所以我们需要做一些二次封装增强转换能力。同时,由于输出目的地种类较多,在设计上,通过插件化方式提供选择转存目的地。
在这里插入图片描述

参考资料

  • GitHub:https://github.com/bazaarvoice/jolt
  • Demo:http://jolt-demo.appspot.com/#mapToList
  • Stack Overflow:https://stackoverflow.com/questions/1618038/xslt-equivalent-for-json
  • Blog:https://cloud.tencent.com/developer/article/1690416
  • Doc:Jolt PPT 介绍

六、小结

至此,本文介绍了ETL工具项目的背景和总体设计,编码过程和使用方式将在后续文章中慢慢介绍,有兴趣的朋友可以关注一波。下一篇《搭建大型分布式服务(三十二)基于JOLT的ETL工具 - Kafka消息转存到ES》

加我加群一起交流学习!更多干货下载、项目源码和大厂内推等着你

搭建大型分布式服务(三十一)基于JOLT实现业务型轻量级ETL工具

搭建大型分布式服务(三十一)基于JOLT实现业务型轻量级ETL工具 《新程序员》:云原生和全面数字化实践 搭建大型分布式服务(三十一)基于JOLT实现业务型轻量级ETL工具 50位技术专家共同创作,文字、视频、音频交互阅读