> 文档中心 > 搭建大型分布式服务(三十三)十分钟搭SpringBoot积木 - 完成Kafka消息转存到DB

搭建大型分布式服务(三十三)十分钟搭SpringBoot积木 - 完成Kafka消息转存到DB


系列文章目录

搭建大型分布式服务(三十三)十分钟搭SpringBoot积木 - 完成Kafka消息转存到DB


文章目录

  • 系列文章目录
  • 前言
      • 一、本文要点
      • 二、开发环境
      • 三、容器模式
      • 四、小结

前言

玩转SpringBoot,像搭积木一样简单。消费kafka消息,然后把数据存储到DB,这是一个很典型的业务了,相信每个小伙伴们都有类似的需求。遇到这类需求,大部分的做法都是从零开始搭建一个消费工程。这样的重复劳作无疑是非常的繁琐的,但如果我们只是仅仅把kafka消息转换成另外一个格式存储到DB,有没有通用的架构呢?
搭建大型分布式服务(三十三)十分钟搭SpringBoot积木 - 完成Kafka消息转存到DB


一、本文要点

前面的文章,介绍了基于Jolt如何实现业务型的ETL工具,本文将介绍这款工具的实际应用场景,如何快速将kafka消息转存到db。系列文章完整目录

  • JSON to JSON
  • JSON 结构转换
  • Kafka 转存到 ES
  • Kafka 转存到 DB
  • DB 数据转KAFKA消息
  • DB 数据转存ES
  • 低代码平台
  • SpringBoot积木

二、开发环境

  • jdk 1.8
  • maven 3.6.2
  • springboot 2.4.3
  • Jolt 0.1.5
  • kafka 2.0
  • es 6.8.2
  • idea 2020

三、容器模式

1、新建容器
使用制作好的etl镜像,创建工作负载,参考文章:
《搭建大型分布式服务(二十五)如何将应用部署到TKE容器集群?》
《搭建大型分布式服务(二十七)如何通过Coding流水线CI/CD将SpringBoot服务部署到TKE容器集群》
在这里插入图片描述
其中SPRING_EXT变量可以自定义profile的值,指定加载applicationn-xxx.properties文件的配置。
参考:《搭建大型分布式服务(三)SpringBoot多环境配置》

2、配置同步参数
(1)可以直接在Apollo配置中心直接修改profile对应环境的application.properties文件配置信息。
参考:《搭建大型分布式服务(十七)SpringBoot 配置托管到Apollo》
(2)也可以直接利用SpringBoot配置文件加载顺序原因,将文件application-xxx.properties挂载到容器config/application-xxx.properties目录,覆盖容器目录中的文件。

## 公共配置spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driverspring.datasource.type=com.zaxxer.hikari.HikariDataSource# 输入 #spring.jolt.input.type=kafkaspring.jolt.input.kafka.topic=book-topicspring.jolt.input.kafka.group-id=group_sync_to_dbspring.jolt.input.kafka.concurrency=1spring.jolt.input.kafka.consumer.bootstrapServers=127.0.0.1:9092spring.jolt.input.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.jolt.input.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.jolt.input.kafka.consumer.max-poll-records=50spring.jolt.input.kafka.consumer.group-id=group_sync_to_dbspring.jolt.input.kafka.consumer.auto-offset-reset=earliest# 转换 #spring.jolt.spec.value=[{"operation":"shift","spec":{"data":{"book_id":"bookId", "book_name":"bookName"}}}]spring.jolt.spec.filter=# 输出  DBspring.jolt.output.db.enabled=truespring.jolt.output.db.hikari.jdbc-url=jdbc:mysql://127.0.0.1:3306/book?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=GMT%2B8spring.jolt.output.db.hikari.username=writerspring.jolt.output.db.hikari.password=spring.jolt.output.db.hikari.minimum-idle=1spring.jolt.output.db.hikari.maximum-pool-size=15spring.jolt.output.db.hikari.auto-commit=truespring.jolt.output.db.hikari.idle-timeout=30000spring.jolt.output.db.hikari.pool-name=primary_poolspring.jolt.output.db.hikari.max-lifetime=1800000spring.jolt.output.db.hikari.connection-timeout=30001spring.jolt.output.db.hikari.connection-test-query=SELECT 1 FROM DUALspring.jolt.output.db.hikari.connection-init-sql=set names utf8mb4spring.jolt.output.db.table-name=t_bookspring.jolt.output.db.exclude-fields=spring.jolt.output.db.update-enabled=true

spring.jolt.spec.value 是基于jolt转换逻辑的核心配置项,在这可以不用写任何java代码就可以实现kafka消息转换成db存储格式。

3、完成配置后,启动容器,kafka消息转存db任务就已经正常运行。可以发送一条kafka消息,验证一下。
搭建大型分布式服务(三十三)十分钟搭SpringBoot积木 - 完成Kafka消息转存到DB

四、小结

至此,简单几步,本文就实现了kafka消息按任意格式转存到es了。下一篇《搭建大型分布式服务(三十四)基于JOLT的ETL工具 - Kafka消息转为Kafka消息》

加我加群一起交流学习!更多干货下载、项目源码和大厂内推等着你

搭建大型分布式服务(三十三)十分钟搭SpringBoot积木 - 完成Kafka消息转存到DB