> 文档中心 > 搭建大型分布式服务(三十二)基于JOLT的ETL工具 - Kafka消息转存到ES

搭建大型分布式服务(三十二)基于JOLT的ETL工具 - Kafka消息转存到ES


系列文章目录


文章目录

  • 系列文章目录
  • 前言
      • 一、本文要点
      • 二、开发环境
      • 三、容器模式
      • 四、小结

前言

将kafka消息转存到es,提供业务方检索,这是一种很常见的业务场景。例如数据检索、日志ELK等等。但假如kafka的消息结构和es的数据结构相差比较大,怎样办呢?硬编码去做字段映射?本文介绍一个便捷的方式,15分钟实现kafka消息转存es。


一、本文要点

前面的文章,介绍了基于Jolt如何实现业务型的ETL工具,本文将介绍这款工具的实际应用场景,如何快速将kafka消息转存到es。系列文章完整目录

  • JSON to JSON
  • JSON 结构转换
  • Kafka 转存到 ES
  • Kafka 转存到 DB
  • DB 数据转KAFKA消息
  • DB 数据转存ES

二、开发环境

  • jdk 1.8
  • maven 3.6.2
  • springboot 2.4.3
  • Jolt 0.1.5
  • kafka 2.0
  • es 6.8.2
  • idea 2020

三、容器模式

1、新建容器
使用制作好的etl镜像,创建工作负载,参考文章:
《搭建大型分布式服务(二十五)如何将应用部署到TKE容器集群?》
《搭建大型分布式服务(二十七)如何通过Coding流水线CI/CD将SpringBoot服务部署到TKE容器集群》
在这里插入图片描述
其中SPRING_EXT变量可以自定义profile的值,指定加载applicationn-xxx.properties文件的配置。
参考:《搭建大型分布式服务(三)SpringBoot多环境配置》

2、配置同步参数
(1)可以直接在Apollo配置中心直接修改profile对应环境的application.properties文件配置信息。
参考:《搭建大型分布式服务(十七)SpringBoot 配置托管到Apollo》
(2)也可以直接利用SpringBoot配置文件加载顺序原因,将文件application-xxx.properties挂载到容器config/application-xxx.properties目录,覆盖容器目录中的文件。

## 公共配置# 这里没用到DB,先禁用spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration################### 输入 ###################spring.jolt.input.type=kafkaspring.jolt.input.kafka.topic=book-topicspring.jolt.input.kafka.group-id=group_sync_to_esspring.jolt.input.kafka.concurrency=1spring.jolt.input.kafka.consumer.bootstrapServers=127.0.0.1:9092spring.jolt.input.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.jolt.input.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.jolt.input.kafka.consumer.max-poll-records=50spring.jolt.input.kafka.consumer.group-id=group_sync_to_esspring.jolt.input.kafka.consumer.auto-offset-reset=earliest################### 转换 ###################spring.jolt.spec.value=[{"operation":"shift","spec":{"data":{"book_id":"bookId"}}}]spring.jolt.spec.filter=################### 输出 ##################### ESspring.jolt.output.es.enabled=truespring.jolt.output.es.ip=127.0.0.1:9200spring.jolt.output.es.user=elasticspring.jolt.output.es.passwd=spring.jolt.output.es.index-name=bookspring.jolt.output.es.index-type=_docspring.jolt.output.es.bulk-actions=1spring.jolt.output.es.index-id=bookId

spring.jolt.spec.value 是基于jolt转换逻辑的核心配置项,在这可以不用写任何java代码就可以实现kafka消息转换成es存储格式。

3、完成配置后,启动容器,kafka消息转存es任务就已经正常运行。可以发送一条kafka消息,验证一下。
在这里插入图片描述

四、小结

至此,简单几步,本文就实现了kafka消息按任意格式转存到es了。下一篇《搭建大型分布式服务(三十二)基于JOLT的ETL工具 - Kafka消息转存到DB》

加我加群一起交流学习!更多干货下载、项目源码和大厂内推等着你

搭建大型分布式服务(三十二)基于JOLT的ETL工具 - Kafka消息转存到ES