搭建大型分布式服务(三十三)十分钟搭SpringBoot积木 - 完成Kafka消息转存到DB
系列文章目录
文章目录
- 系列文章目录
- 前言
-
-
- 一、本文要点
- 二、开发环境
- 三、容器模式
- 四、小结
-
前言
玩转SpringBoot,像搭积木一样简单。消费kafka消息,然后把数据存储到DB,这是一个很典型的业务了,相信每个小伙伴们都有类似的需求。遇到这类需求,大部分的做法都是从零开始搭建一个消费工程。这样的重复劳作无疑是非常的繁琐的,但如果我们只是仅仅把kafka消息转换成另外一个格式存储到DB,有没有通用的架构呢?
一、本文要点
前面的文章,介绍了基于Jolt如何实现业务型的ETL工具,本文将介绍这款工具的实际应用场景,如何快速将kafka消息转存到db。系列文章完整目录
- JSON to JSON
- JSON 结构转换
- Kafka 转存到 ES
- Kafka 转存到 DB
- DB 数据转KAFKA消息
- DB 数据转存ES
- 低代码平台
- SpringBoot积木
二、开发环境
- jdk 1.8
- maven 3.6.2
- springboot 2.4.3
- Jolt 0.1.5
- kafka 2.0
- es 6.8.2
- idea 2020
三、容器模式
1、新建容器
使用制作好的etl镜像,创建工作负载,参考文章:
《搭建大型分布式服务(二十五)如何将应用部署到TKE容器集群?》
《搭建大型分布式服务(二十七)如何通过Coding流水线CI/CD将SpringBoot服务部署到TKE容器集群》
其中SPRING_EXT变量可以自定义profile的值,指定加载applicationn-xxx.properties文件的配置。
参考:《搭建大型分布式服务(三)SpringBoot多环境配置》
2、配置同步参数
(1)可以直接在Apollo配置中心直接修改profile对应环境的application.properties文件配置信息。
参考:《搭建大型分布式服务(十七)SpringBoot 配置托管到Apollo》
(2)也可以直接利用SpringBoot配置文件加载顺序原因,将文件application-xxx.properties挂载到容器config/application-xxx.properties目录,覆盖容器目录中的文件。
## 公共配置spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driverspring.datasource.type=com.zaxxer.hikari.HikariDataSource# 输入 #spring.jolt.input.type=kafkaspring.jolt.input.kafka.topic=book-topicspring.jolt.input.kafka.group-id=group_sync_to_dbspring.jolt.input.kafka.concurrency=1spring.jolt.input.kafka.consumer.bootstrapServers=127.0.0.1:9092spring.jolt.input.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.jolt.input.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.jolt.input.kafka.consumer.max-poll-records=50spring.jolt.input.kafka.consumer.group-id=group_sync_to_dbspring.jolt.input.kafka.consumer.auto-offset-reset=earliest# 转换 #spring.jolt.spec.value=[{"operation":"shift","spec":{"data":{"book_id":"bookId", "book_name":"bookName"}}}]spring.jolt.spec.filter=# 输出 DBspring.jolt.output.db.enabled=truespring.jolt.output.db.hikari.jdbc-url=jdbc:mysql://127.0.0.1:3306/book?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=GMT%2B8spring.jolt.output.db.hikari.username=writerspring.jolt.output.db.hikari.password=spring.jolt.output.db.hikari.minimum-idle=1spring.jolt.output.db.hikari.maximum-pool-size=15spring.jolt.output.db.hikari.auto-commit=truespring.jolt.output.db.hikari.idle-timeout=30000spring.jolt.output.db.hikari.pool-name=primary_poolspring.jolt.output.db.hikari.max-lifetime=1800000spring.jolt.output.db.hikari.connection-timeout=30001spring.jolt.output.db.hikari.connection-test-query=SELECT 1 FROM DUALspring.jolt.output.db.hikari.connection-init-sql=set names utf8mb4spring.jolt.output.db.table-name=t_bookspring.jolt.output.db.exclude-fields=spring.jolt.output.db.update-enabled=true
spring.jolt.spec.value
是基于jolt转换逻辑的核心配置项,在这可以不用写任何java代码就可以实现kafka消息转换成db存储格式。
3、完成配置后,启动容器,kafka消息转存db任务就已经正常运行。可以发送一条kafka消息,验证一下。
四、小结
至此,简单几步,本文就实现了kafka消息按任意格式转存到es了。下一篇《搭建大型分布式服务(三十四)基于JOLT的ETL工具 - Kafka消息转为Kafka消息》
加我加群一起交流学习!更多干货下载、项目源码和大厂内推等着你
![]() |
![]() |
![]() |