> 技术文档 > 手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)


手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

如何利用 Apache Flink 结合 CDC(Change Data Capture,变更数据捕获)技术,将 MySQL 的数据实时导入 StarRocks,打造高效的实时数仓。这不仅是企业数字化转型的利器,也是技术人提升竞争力的绝佳实战场景!

在数据驱动的时代,实时性是企业的核心竞争力。传统的批量 ETL(抽取-转换-加载)方式往往因为延迟高、效率低而无法满足实时分析需求。而 Flink 作为流处理的王者,搭配 CDC 捕获 MySQL 的增量变更,再结合 StarRocks 的高性能分析能力,形成了一个强大的实时数据入湖方案。无论你是数据库工程师、数据分析师,还是对数仓建设感兴趣的初学者,这篇文章都将手把手带你完成从环境搭建到整库同步的实战流程。

通过这篇博文,你将学会如何安装 Flink 和 MySQL CDC 连接器,编写 YAML 文件实现整库同步,并将数据无缝导入 StarRocks。准备好服务器,泡杯咖啡,咱们一起开启这场实时入湖的实战之旅吧!

**准备好你的服务器,泡杯咖啡,咱们一起“上代码、上步骤、上实战”,开启这场实时入湖的硬核之旅!
**


文章目录

  • 手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)
  • 第一步:为什么选择 Flink + CDC + StarRocks?
  • 第二步:环境准备与工具安装
    • 具体配置
    • 2.0 MySql 配置
      • 验证是否是 binlog 模式
      • 如果没有 需要 启动 binlog 模式
      • 创建数据库和表 用来同步
    • 2.1 java 安装
    • 2.2 下载 Flink 1.20.1
    • 2.3 解压 Flink 1.20.1
    • 2.4 配置 Flink-1.20.1/conf/config.yaml
    • 2.5 下载 Flink-CDC-3.3.0
    • 2.6 解压 Flink-CDC-3.3.0
    • 2.7 下载驱动 Flink-cdc-pipeline-connector-mysql-3.3.0.jar
    • 2.8 下载驱动 Flink-cdc-pipeline-connector-starrocks-3.3.0.jar
    • 2.9 移到 Flink-cdc-3.3.0/lib 下
    • 2.10 下载 MySQL JDBC 驱动
    • 2.11 启动 Flink-1.20.1
    • 2.12 网络和数据库连接 测试
    • 2.13 创建 yaml
    • 2.14 运行 yaml
  • 最后验证 是否同步
    • 查看starrocks
    • 连接 192.168.5.128:8030 ,并输入 用户名密码

第一步:为什么选择 Flink + CDC + StarRocks?

Flink 是一个强大的流处理框架,擅长处理实时数据流,吞吐量高、延迟低,非常适合实时数仓场景。而 CDC 技术能捕获 MySQL 数据库的增量变更(比如插入、更新、删除操作),让我们无需全量扫描数据库,就能实时获取数据变化。至于 StarRocks,它是一个高性能的分析型数据库,查询速度快,支持实时分析场景。把这三者结合起来,简直是实时数据入湖的“黄金三角”!


第二步:环境准备与工具安装

先是vm 上 安装了 4台 linux centos 8 (如下图)
配置了 4台 VM 虚拟机
3台 安装 StarRocks
1台安装 mysql +flink+flinkcdc

具体可以参考Streaming ELT 同步 MySQL 到 StarRocks

但官方的例子 需要安装 docker 。当你真实配置你会发现,环境各有不同 ,不同
java 版本 ,Flink 和 cdc 用什么版本 ,有没有什么依赖性都需要考虑 问题。
包括 mysql 和 starocks 版本等.
我在实践中也出现的很多问题,经过很多尝试最终完成.

具体配置

组件 安装 IP 地址 角色/说明 Apache Flink 192.168.5.131 Flink 集群(JobManager + TaskManager) Flink CDC 连接器 192.168.5.131 部署于 Flink 的 lib 目录,用于捕获 MySQL 变更数据 MySQL 192.168.5.131 源数据库,提供数据并启用 Binlog StarRocks FE + BE 192.168.5.128 StarRocks 前端(FE)+ 后端(BE) StarRocks BE 192.168.5.129 StarRocks 后端(BE)节点 2 StarRocks BE 192.168.5.130 StarRocks 后端(BE)节点 3

说明
Flink 和 Flink CDC:均部署在 192.168.5.131,Flink CDC 连接器通常作为 JAR 文件放置在 Flink 的 lib 目录下,与 Flink 共享同一节点。

MySQL:与 Flink 部署在同一服务器(192.168.5.131),需确保 Binlog 已启用。

StarRocks:分布式部署,FE 和一个 BE 节点在 192.168.5.128,另外两个 BE 节点分别在 192.168.5.129 和 192.168.5.130,形成一个典型的多节点集群。

网络要求:确保所有 IP 地址之间网络互通,特别是 MySQL(3306 端口)、Flink(8081 等端口)、StarRocks(8030、9030 等端口)需开放相关端口。

关于 怎么安装 starrocks 可以访问 Starrocks 中文论坛
关于 怎么安装 mysql 可以访问 这篇三步搞定 mysql 8.0的安装

本案列不需要安装 Docker
手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

CentOS8_cd_Flink
可以看到 配置不高,主要用来完成这个实验 ,
手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)
如果是生产环境建议以下配置
Flink 是一个资源密集型的流处理框架,对 CPU、内存、磁盘和网络有一定要求。以下是推荐的硬件配置:

开发/测试环境(单节点或小型集群)

: CPU:4 核 ~ 8 核(如 Intel Xeon 或 AMD EPYC,推荐 2.5 GHz
以上)。 内存:16 GB ~ 32 GB(Flink 作业和 JVM 堆内存需至少 8 GB)。 磁盘:500 GB
SSD(用于存储检查点、日志和临时数据)。 网络:千兆网卡(1 Gbps),确保低延迟数据传输。

生产环境(分布式集群):

CPU:16 核 ~ 32 核 per TaskManager(推荐多核 CPU 以支持高并行度)。 内存:64 GB ~ 128 GB
per TaskManager(建议为 Flink 分配 70%~80% 的内存,剩余用于操作系统)。 磁盘:1 TB ~ 2 TB
NVMe SSD(高 IOPS,适合检查点和状态存储)。 网络:万兆网卡(10 Gbps),支持高吞吐量数据传输。 节点数:至少 3
个节点(1 个 JobManager + 2 个 TaskManager),可根据任务规模扩展。

2.0 MySql 配置

验证是否是 binlog 模式

SHOW VARIABLES LIKE \'log_bin\'; SHOW VARIABLES LIKE \'%binlog%\'; 

手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)
手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

如果没有 需要 启动 binlog 模式

# /etc/my.cnf[mysqld]log-bin=mysql-bin # 启用binlog-format=ROW

– 1. 检查 binlog 是否启用 SHOW VARIABLES LIKE ‘log_bin’; – 必须是 ON

– 2. 设置格式为 ROW(最重要) SET GLOBAL binlog_format = ‘ROW’;

– 3. 设置合理的过期时间 SET GLOBAL binlog_expire_logs_seconds = 604800; – 7 天

创建数据库和表 用来同步

-- 创建数据库CREATE DATABASE app_db;USE app_db;-- 创建 orders 表CREATE TABLE `orders` (`id` INT NOT NULL,`price` DECIMAL(10,2) NOT NULL,PRIMARY KEY (`id`));-- 插入数据INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);-- 创建 shipments 表CREATE TABLE `shipments` (`id` INT NOT NULL,`city` VARCHAR(255) NOT NULL,PRIMARY KEY (`id`));-- 插入数据INSERT INTO `shipments` (`id`, `city`) VALUES (1, \'beijing\');INSERT INTO `shipments` (`id`, `city`) VALUES (2, \'xian\');-- 创建 products 表CREATE TABLE `products` (`id` INT NOT NULL,`product` VARCHAR(255) NOT NULL,PRIMARY KEY (`id`));-- 插入数据INSERT INTO `products` (`id`, `product`) VALUES (1, \'Beer\');INSERT INTO `products` (`id`, `product`) VALUES (2, \'Cap\');INSERT INTO `products` (`id`, `product`) VALUES (3, \'Peanut\');

2.1 java 安装

可以看到 java 已经安装好 版本 11
手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

2.2 下载 Flink 1.20.1

wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.20.1/flink-1.20.1-bin-scala_2.12.tgz

手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

2.3 解压 Flink 1.20.1

tar -xzf flink-1.20.1-bin-scala_2.12.tgzcd flink-1.20.1ll

手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

2.4 配置 Flink-1.20.1/conf/config.yaml

vim flink-1.20.1/conf/config.yaml改为 rest.address: 0.0.0.0改为 rest.bind-address: 0.0.0.0

rest.address 和 rest.bind-address 是与 Flink 的 REST API 和 Web UI 相关的配置项,用于控制 Flink JobManager 的 REST 服务监听地址。这些配置决定了 Flink 的 Web UI 和客户端如何访问 JobManager。
手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

2.5 下载 Flink-CDC-3.3.0

wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-cdc-3.3.0/flink-cdc-3.3.0-bin.tar.gz

手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

2.6 解压 Flink-CDC-3.3.0

tar -xzf flink-cdc-3.3.0-bin.tar.gzcd flink-cdc-3.3.0

手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

2.7 下载驱动 Flink-cdc-pipeline-connector-mysql-3.3.0.jar

wget \"https://aliyun-osm-maven.oss-cn-shanghai.aliyuncs.com/repository/central/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.3.0/flink-cdc-pipeline-connector-mysql-3.3.0.jar?Expires=1753349291&OSSAccessKeyId=LTAI5tQeTg2SkYgiUPXMyK7t&Signature=jnckjewN8vqzaE3tbupo691o8YY%3D\" -O lib/flink-cdc-pipeline-connector-mysql-3.3.0.jar

2.8 下载驱动 Flink-cdc-pipeline-connector-starrocks-3.3.0.jar

wget \"https://aliyun-osm-maven.oss-cn-shanghai.aliyuncs.com/repository/central/org/apache/flink/flink-cdc-pipeline-connector-starrocks/3.3.0/flink-cdc-pipeline-connector-starrocks-3.3.0.jar?Expires=1753349371&OSSAccessKeyId=LTAI5tQeTg2SkYgiUPXMyK7t&Signature=Fd0JxnlDxr1nKkP8wOoIHHhGV2c%3D\" -O lib/flink-cdc-pipeline-connector-starrocks-3.3.0.jar

2.9 移到 Flink-cdc-3.3.0/lib 下

手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

2.10 下载 MySQL JDBC 驱动

wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar

2.11 启动 Flink-1.20.1

./flink-1.20.1/bin/start-cluster.sh

手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

打开 网页
192.168.5.131 :8081,如果可以打开说明启动成功
手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

点击 Task Managers ,说明基本正常

手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

2.12 网络和数据库连接 测试

ping 192.168.5.131ping 192.168.5.128telnet 192.168.5.131 3306telnet 192.168.5.128 9030

手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

2.13 创建 yaml

手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

source: type: mysql hostname: 192.168.5.131 # 修改为实际 MySQL 地址 port: 3306 username: root password: 123456 tables: app_db.\\.* #app_db修改为库名 server-id: 5400-5404sink: type: starrocks jdbc-url: jdbc:mysql://192.168.5.128:9030 # 修改为实际 StarRocks 地址 load-url: 192.168.5.128:8030 username: root password: 123456 table.create.properties.replication_num: 1pipeline: name: MySQL to StarRocks Pipeline parallelism: 1

source:
type: mysql
hostname: 192.168.5.131 # MySQL 服务器 IP 地址
port: 3306 # MySQL 端口
username: root # 连接
MySQL 的用户名
password: 123456 # 连接 MySQL 的密码 tables:
app_db…* # 需要同步的表,支持正则,这里是 app_db 库下的所有表
server-id:
5400-5404 # MySQL binlog server_id 范围(Flink CDC 会随机选一个)

sink:
type: starrocks
jdbc-url: jdbc:mysql://192.168.5.128:9030 # StarRocks JDBC 连接地址
load-url: 192.168.5.128:8030 # StarRocks Stream Load 地址
username: root # StarRocks 用户名
password: 123456 # StarRocks 密码
table.create.properties.replication_num: 1 # 表副本数设置为1

pipeline:
name: MySQL to StarRocks Pipeline # 管道名称
parallelism: 1 # 并行度设置为1

2.14 运行 yaml

因为有依赖关系 需要放在 flink-cdc-3.3.0 目录下面
手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

./bin/flink-cdc.sh --flink-home /root/flink-1.20.1 mysql-to-starrocks-pipeline.yaml

./bin/flink-cdc.sh - 这是 Flink CDC 的启动脚本
–flink-home /root/flink-1.20.1 - 指定 Flink 的安装目录
mysql-to-starrocks-pipeline.yaml - 配置文件路径

手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

页面如下
手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)
手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)
设置

./bin/flink-cdc.sh -Dexecution.checkpointing.interval=3000 --flink-home /root/flink-1.20.1 mysql-to-starrocks-pipeline.yaml

-Dexecution.checkpointing.interval=3000
-D: 这是 JVM 参数前缀,用于设置系统属性
execution.checkpointing.interval: Flink 配置参数名
3000: 时间间隔,单位是毫秒,即 3000ms = 3秒

最后验证 是否同步

查看starrocks

可以从图中看到原来 starrocks 是都没有数据库的
现在有数据库,表也自动同步好了。
手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)

连接 192.168.5.128:8030 ,并输入 用户名密码

手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)
手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)