手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)
手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)
如何利用 Apache Flink 结合 CDC(Change Data Capture,变更数据捕获)技术,将 MySQL 的数据实时导入 StarRocks,打造高效的实时数仓。这不仅是企业数字化转型的利器,也是技术人提升竞争力的绝佳实战场景!
在数据驱动的时代,实时性是企业的核心竞争力。传统的批量 ETL(抽取-转换-加载)方式往往因为延迟高、效率低而无法满足实时分析需求。而 Flink 作为流处理的王者,搭配 CDC 捕获 MySQL 的增量变更,再结合 StarRocks 的高性能分析能力,形成了一个强大的实时数据入湖方案。无论你是数据库工程师、数据分析师,还是对数仓建设感兴趣的初学者,这篇文章都将手把手带你完成从环境搭建到整库同步的实战流程。
通过这篇博文,你将学会如何安装 Flink 和 MySQL CDC 连接器,编写 YAML 文件实现整库同步,并将数据无缝导入 StarRocks。准备好服务器,泡杯咖啡,咱们一起开启这场实时入湖的实战之旅吧!
**准备好你的服务器,泡杯咖啡,咱们一起“上代码、上步骤、上实战”,开启这场实时入湖的硬核之旅!
**
文章目录
- 手把手教你用 Flink + CDC 实现 MySQL 数据实时导入 StarRocks(干货)
- 第一步:为什么选择 Flink + CDC + StarRocks?
- 第二步:环境准备与工具安装
-
- 具体配置
- 2.0 MySql 配置
-
- 验证是否是 binlog 模式
- 如果没有 需要 启动 binlog 模式
- 创建数据库和表 用来同步
- 2.1 java 安装
- 2.2 下载 Flink 1.20.1
- 2.3 解压 Flink 1.20.1
- 2.4 配置 Flink-1.20.1/conf/config.yaml
- 2.5 下载 Flink-CDC-3.3.0
- 2.6 解压 Flink-CDC-3.3.0
- 2.7 下载驱动 Flink-cdc-pipeline-connector-mysql-3.3.0.jar
- 2.8 下载驱动 Flink-cdc-pipeline-connector-starrocks-3.3.0.jar
- 2.9 移到 Flink-cdc-3.3.0/lib 下
- 2.10 下载 MySQL JDBC 驱动
- 2.11 启动 Flink-1.20.1
- 2.12 网络和数据库连接 测试
- 2.13 创建 yaml
- 2.14 运行 yaml
- 最后验证 是否同步
-
- 查看starrocks
- 连接 192.168.5.128:8030 ,并输入 用户名密码
第一步:为什么选择 Flink + CDC + StarRocks?
Flink 是一个强大的流处理框架,擅长处理实时数据流,吞吐量高、延迟低,非常适合实时数仓场景。而 CDC 技术能捕获 MySQL 数据库的增量变更(比如插入、更新、删除操作),让我们无需全量扫描数据库,就能实时获取数据变化。至于 StarRocks,它是一个高性能的分析型数据库,查询速度快,支持实时分析场景。把这三者结合起来,简直是实时数据入湖的“黄金三角”!
第二步:环境准备与工具安装
先是vm 上 安装了 4台 linux centos 8 (如下图)
配置了 4台 VM 虚拟机
3台 安装 StarRocks
1台安装 mysql +flink+flinkcdc
具体可以参考Streaming ELT 同步 MySQL 到 StarRocks
但官方的例子 需要安装 docker 。当你真实配置你会发现,环境各有不同 ,不同
java 版本 ,Flink 和 cdc 用什么版本 ,有没有什么依赖性都需要考虑 问题。
包括 mysql 和 starocks 版本等.
我在实践中也出现的很多问题,经过很多尝试最终完成.
具体配置
lib 目录,用于捕获 MySQL 变更数据说明
Flink 和 Flink CDC:均部署在 192.168.5.131,Flink CDC 连接器通常作为 JAR 文件放置在 Flink 的 lib 目录下,与 Flink 共享同一节点。
MySQL:与 Flink 部署在同一服务器(192.168.5.131),需确保 Binlog 已启用。
StarRocks:分布式部署,FE 和一个 BE 节点在 192.168.5.128,另外两个 BE 节点分别在 192.168.5.129 和 192.168.5.130,形成一个典型的多节点集群。
网络要求:确保所有 IP 地址之间网络互通,特别是 MySQL(3306 端口)、Flink(8081 等端口)、StarRocks(8030、9030 等端口)需开放相关端口。
关于 怎么安装 starrocks 可以访问 Starrocks 中文论坛
关于 怎么安装 mysql 可以访问 这篇三步搞定 mysql 8.0的安装
本案列不需要安装 Docker

CentOS8_cd_Flink
可以看到 配置不高,主要用来完成这个实验 ,

如果是生产环境建议以下配置
Flink 是一个资源密集型的流处理框架,对 CPU、内存、磁盘和网络有一定要求。以下是推荐的硬件配置:
开发/测试环境(单节点或小型集群)
: CPU:4 核 ~ 8 核(如 Intel Xeon 或 AMD EPYC,推荐 2.5 GHz
以上)。 内存:16 GB ~ 32 GB(Flink 作业和 JVM 堆内存需至少 8 GB)。 磁盘:500 GB
SSD(用于存储检查点、日志和临时数据)。 网络:千兆网卡(1 Gbps),确保低延迟数据传输。
生产环境(分布式集群):
CPU:16 核 ~ 32 核 per TaskManager(推荐多核 CPU 以支持高并行度)。 内存:64 GB ~ 128 GB
per TaskManager(建议为 Flink 分配 70%~80% 的内存,剩余用于操作系统)。 磁盘:1 TB ~ 2 TB
NVMe SSD(高 IOPS,适合检查点和状态存储)。 网络:万兆网卡(10 Gbps),支持高吞吐量数据传输。 节点数:至少 3
个节点(1 个 JobManager + 2 个 TaskManager),可根据任务规模扩展。
2.0 MySql 配置
验证是否是 binlog 模式
SHOW VARIABLES LIKE \'log_bin\'; SHOW VARIABLES LIKE \'%binlog%\';


如果没有 需要 启动 binlog 模式
# /etc/my.cnf[mysqld]log-bin=mysql-bin # 启用binlog-format=ROW
– 1. 检查 binlog 是否启用 SHOW VARIABLES LIKE ‘log_bin’; – 必须是 ON
– 2. 设置格式为 ROW(最重要) SET GLOBAL binlog_format = ‘ROW’;
– 3. 设置合理的过期时间 SET GLOBAL binlog_expire_logs_seconds = 604800; – 7 天
创建数据库和表 用来同步
-- 创建数据库CREATE DATABASE app_db;USE app_db;-- 创建 orders 表CREATE TABLE `orders` (`id` INT NOT NULL,`price` DECIMAL(10,2) NOT NULL,PRIMARY KEY (`id`));-- 插入数据INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);-- 创建 shipments 表CREATE TABLE `shipments` (`id` INT NOT NULL,`city` VARCHAR(255) NOT NULL,PRIMARY KEY (`id`));-- 插入数据INSERT INTO `shipments` (`id`, `city`) VALUES (1, \'beijing\');INSERT INTO `shipments` (`id`, `city`) VALUES (2, \'xian\');-- 创建 products 表CREATE TABLE `products` (`id` INT NOT NULL,`product` VARCHAR(255) NOT NULL,PRIMARY KEY (`id`));-- 插入数据INSERT INTO `products` (`id`, `product`) VALUES (1, \'Beer\');INSERT INTO `products` (`id`, `product`) VALUES (2, \'Cap\');INSERT INTO `products` (`id`, `product`) VALUES (3, \'Peanut\');
2.1 java 安装
可以看到 java 已经安装好 版本 11

2.2 下载 Flink 1.20.1
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.20.1/flink-1.20.1-bin-scala_2.12.tgz

2.3 解压 Flink 1.20.1
tar -xzf flink-1.20.1-bin-scala_2.12.tgzcd flink-1.20.1ll


2.4 配置 Flink-1.20.1/conf/config.yaml
vim flink-1.20.1/conf/config.yaml改为 rest.address: 0.0.0.0改为 rest.bind-address: 0.0.0.0
rest.address 和 rest.bind-address 是与 Flink 的 REST API 和 Web UI 相关的配置项,用于控制 Flink JobManager 的 REST 服务监听地址。这些配置决定了 Flink 的 Web UI 和客户端如何访问 JobManager。

2.5 下载 Flink-CDC-3.3.0
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-cdc-3.3.0/flink-cdc-3.3.0-bin.tar.gz

2.6 解压 Flink-CDC-3.3.0
tar -xzf flink-cdc-3.3.0-bin.tar.gzcd flink-cdc-3.3.0

2.7 下载驱动 Flink-cdc-pipeline-connector-mysql-3.3.0.jar
wget \"https://aliyun-osm-maven.oss-cn-shanghai.aliyuncs.com/repository/central/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.3.0/flink-cdc-pipeline-connector-mysql-3.3.0.jar?Expires=1753349291&OSSAccessKeyId=LTAI5tQeTg2SkYgiUPXMyK7t&Signature=jnckjewN8vqzaE3tbupo691o8YY%3D\" -O lib/flink-cdc-pipeline-connector-mysql-3.3.0.jar
2.8 下载驱动 Flink-cdc-pipeline-connector-starrocks-3.3.0.jar
wget \"https://aliyun-osm-maven.oss-cn-shanghai.aliyuncs.com/repository/central/org/apache/flink/flink-cdc-pipeline-connector-starrocks/3.3.0/flink-cdc-pipeline-connector-starrocks-3.3.0.jar?Expires=1753349371&OSSAccessKeyId=LTAI5tQeTg2SkYgiUPXMyK7t&Signature=Fd0JxnlDxr1nKkP8wOoIHHhGV2c%3D\" -O lib/flink-cdc-pipeline-connector-starrocks-3.3.0.jar
2.9 移到 Flink-cdc-3.3.0/lib 下

2.10 下载 MySQL JDBC 驱动
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
2.11 启动 Flink-1.20.1
./flink-1.20.1/bin/start-cluster.sh

打开 网页
192.168.5.131 :8081,如果可以打开说明启动成功

点击 Task Managers ,说明基本正常

2.12 网络和数据库连接 测试
ping 192.168.5.131ping 192.168.5.128telnet 192.168.5.131 3306telnet 192.168.5.128 9030

2.13 创建 yaml

source: type: mysql hostname: 192.168.5.131 # 修改为实际 MySQL 地址 port: 3306 username: root password: 123456 tables: app_db.\\.* #app_db修改为库名 server-id: 5400-5404sink: type: starrocks jdbc-url: jdbc:mysql://192.168.5.128:9030 # 修改为实际 StarRocks 地址 load-url: 192.168.5.128:8030 username: root password: 123456 table.create.properties.replication_num: 1pipeline: name: MySQL to StarRocks Pipeline parallelism: 1
source:
type: mysql
hostname: 192.168.5.131 # MySQL 服务器 IP 地址
port: 3306 # MySQL 端口
username: root # 连接
MySQL 的用户名
password: 123456 # 连接 MySQL 的密码 tables:
app_db…* # 需要同步的表,支持正则,这里是 app_db 库下的所有表
server-id:
5400-5404 # MySQL binlog server_id 范围(Flink CDC 会随机选一个)
sink:
type: starrocks
jdbc-url: jdbc:mysql://192.168.5.128:9030 # StarRocks JDBC 连接地址
load-url: 192.168.5.128:8030 # StarRocks Stream Load 地址
username: root # StarRocks 用户名
password: 123456 # StarRocks 密码
table.create.properties.replication_num: 1 # 表副本数设置为1
pipeline:
name: MySQL to StarRocks Pipeline # 管道名称
parallelism: 1 # 并行度设置为1
2.14 运行 yaml
因为有依赖关系 需要放在 flink-cdc-3.3.0 目录下面

./bin/flink-cdc.sh --flink-home /root/flink-1.20.1 mysql-to-starrocks-pipeline.yaml
./bin/flink-cdc.sh - 这是 Flink CDC 的启动脚本
–flink-home /root/flink-1.20.1 - 指定 Flink 的安装目录
mysql-to-starrocks-pipeline.yaml - 配置文件路径

页面如下


设置
./bin/flink-cdc.sh -Dexecution.checkpointing.interval=3000 --flink-home /root/flink-1.20.1 mysql-to-starrocks-pipeline.yaml
-Dexecution.checkpointing.interval=3000
-D: 这是 JVM 参数前缀,用于设置系统属性
execution.checkpointing.interval: Flink 配置参数名
3000: 时间间隔,单位是毫秒,即 3000ms = 3秒
最后验证 是否同步
查看starrocks
可以从图中看到原来 starrocks 是都没有数据库的
现在有数据库,表也自动同步好了。

连接 192.168.5.128:8030 ,并输入 用户名密码




