Flink MySQL CDC 环境配置与验证_flink cdc mysql
一、MySQL 服务器配置详解
1. 启用二进制日志(Binlog)
MySQL CDC 依赖二进制日志获取增量数据,需在 MySQL 配置文件(my.cnf
或 my.ini
)中添加以下配置:
# 启用二进制日志log-bin=mysql-bin# 二进制日志格式(推荐ROW模式,记录行级变更)binlog-format=ROW# 启用GTID(高可用必备)gtid-mode=ONenforce-gtid-consistency=ON# 从库同步时记录binlog(主从架构需要)log-slave-updates=ON# 避免长连接超时(大表快照时需要)interactive_timeout=3600wait_timeout=3600
配置说明:
log-bin
:指定二进制日志文件名前缀,MySQL 会自动生成如mysql-bin.000001
的文件binlog-format=ROW
:相比 STATEMENT 模式,ROW 模式能精确记录每行数据的变更gtid-mode
:全局事务标识符,用于主从切换时保证数据一致性log-slave-updates
:若使用从库同步,需开启此配置让从库也记录 binlog
2. 创建专用用户并授权
-- 创建用户(替换为实际用户名和密码)CREATE USER \'flink_cdc\'@\'localhost\' IDENTIFIED BY \'flink123\';-- 授予必要权限(重要:REPLICATION SLAVE 用于读取binlog)GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO \'flink_cdc\'@\'localhost\';-- 刷新权限FLUSH PRIVILEGES;
权限说明:
SELECT
:读取表数据(快照阶段需要)SHOW DATABASES
:获取数据库列表(用于正则匹配监控库)REPLICATION SLAVE
:读取 binlog 必备权限REPLICATION CLIENT
:获取服务器状态(如binlog位置)
3. 配置唯一 Server ID
每个 Flink 作业需配置不同的 Server ID(避免 binlog 位置冲突):
# 在my.cnf中添加server-id=1001 # 任意唯一整数,建议范围5400-6400
说明:若 Flink 作业并行度为 N,则 Server ID 可设为范围(如 5400-5400+N
),例如:
-- Flink SQL 中通过Hints设置Server ID范围SELECT * FROM mysql_table /*+ OPTIONS(\'server-id\'=\'5401-5404\') */;
二、Flink 环境配置步骤
1. 添加依赖(Maven 项目)
在 pom.xml
中添加 MySQL CDC 连接器依赖:
<dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-mysql-cdc</artifactId> <version>3.0.1</version> <scope>provided</scope></dependency>
2. SQL Client 部署(非Maven环境)
- 下载连接器 JAR 包:flink-sql-connector-mysql-cdc-3.0.1.jar
- 将 JAR 包放入
$FLINK_HOME/lib/
目录 - 重启 Flink 集群使依赖生效
三、Flink MySQL CDC 表定义与参数详解
1. 完整建表示例(Flink SQL)
-- 设置checkpoint间隔(可选)SET \'execution.checkpointing.interval\' = \'3s\';-- 创建MySQL CDC表CREATE TABLE mysql_orders ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, -- 可选:添加元数据列 db_name STRING METADATA FROM \'database_name\' VIRTUAL, table_name STRING METADATA FROM \'table_name\' VIRTUAL, op_ts TIMESTAMP_LTZ(3) METADATA FROM \'op_ts\' VIRTUAL, row_kind STRING METADATA FROM \'row_kind\' VIRTUAL, PRIMARY KEY(order_id) NOT ENFORCED) WITH ( \'connector\' = \'mysql-cdc\', \'hostname\' = \'192.168.1.100\', \'port\' = \'3306\', \'username\' = \'flink_cdc\', \'password\' = \'flink123\', \'database-name\' = \'mydb\', \'table-name\' = \'orders\', -- 可选参数详解 \'server-id\' = \'5401\', \'scan.incremental.snapshot.enabled\' = \'true\', \'scan.incremental.snapshot.chunk.size\' = \'8096\', \'scan.startup.mode\' = \'initial\', \'heartbeat.interval\' = \'30s\', \'debezium.binary.handling.mode\' = \'base64\');
2. 核心参数详解
connector
mysql-cdc
hostname
username
password
database-name
^(test).*
匹配以test开头的库)table-name
server-id
5401-5404
)scan.incremental.snapshot.enabled
scan.startup.mode
initial
(快照+binlog)、earliest-offset
(从最早binlog开始)、latest-offset
(从最新binlog开始)heartbeat.interval
debezium.binary.handling.mode
base64
(转Base64字符串)、hex
(转十六进制),适用于BLOB/VARBINARY类型四、环境验证与测试
1. 准备测试数据(MySQL)
-- 创建测试数据库和表CREATE DATABASE mydb;USE mydb;CREATE TABLE orders ( order_id INT PRIMARY KEY, order_date TIMESTAMP, customer_name VARCHAR(100), price DECIMAL(10, 2), order_status BOOLEAN);-- 插入测试数据INSERT INTO orders VALUES (1, \'2023-01-01 10:00:00\', \'Alice\', 100.50, true),(2, \'2023-01-02 11:00:00\', \'Bob\', 200.75, false);
2. 使用Flink SQL验证
-- 查询MySQL CDC表数据SELECT * FROM mysql_orders;-- 观察输出:应显示插入的两条记录-- 后续在MySQL中更新数据,Flink会实时捕获变更UPDATE mydb.orders SET price = 150.00 WHERE order_id = 1;
3. DataStream API 验证示例
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;public class MySqlCdcExample { public static void main(String[] args) throws Exception { // 创建MySQL Source MySqlSource<String> source = MySqlSource.<String>builder() .hostname(\"192.168.1.100\") .port(3306) .databaseList(\"mydb\") .tableList(\"mydb.orders\") .username(\"flink_cdc\") .password(\"flink123\") .deserializer(new JsonDebeziumDeserializationSchema()) // 转为JSON格式 .startupOptions(StartupOptions.initial()) // 初始模式(快照+binlog) .build(); // 配置Flink环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // 5秒checkpoint env.fromSource(source, WatermarkStrategy.noWatermarks(), \"MySQL CDC Source\") .print(); // 打印到控制台 env.execute(\"MySQL CDC Test\"); }}
4. 验证关键点
-
日志检查:
- Flink 日志应包含
Binlog offset on checkpoint
字样,表明成功获取 binlog 位置 - 无
Access denied
或Permission denied
错误,确认MySQL权限正确
- Flink 日志应包含
-
数据变更测试:
- 在MySQL中执行
INSERT/UPDATE/DELETE
操作,Flink 应实时输出变更数据 - 查看输出中的
row_kind
字段:+I
(插入)、-D
(删除)、+U
(更新后)、-U
(更新前)
- 在MySQL中执行
-
增量快照验证:
- 若表数据量大,查看Flink Web UI的并行度,增量快照模式下多个任务应并行读取
- 日志中无
FLUSH TABLES WITH READ LOCK
相关记录,确认未获取全局锁
五、常见问题与解决方案
-
权限不足错误:
ERROR: Access denied for user \'flink_cdc\'@\'localhost\' (using password: YES)
- 解决方案:确认MySQL用户密码正确,重新执行授权语句,确保包含
REPLICATION SLAVE
权限
- 解决方案:确认MySQL用户密码正确,重新执行授权语句,确保包含
-
Server ID冲突:
ERROR: Another MySQL binlog client is using the same server id
- 解决方案:修改
server-id
为唯一值,或在Flink SQL中通过\'server-id\'=\'5401-5404\'
设置范围
- 解决方案:修改
-
增量快照失败:
ERROR: Table has no primary key, cannot split snapshot chunks
- 解决方案:为表添加主键,或设置
scan.incremental.snapshot.chunk.key-column
为非空列(如\'scan.incremental.snapshot.chunk.key-column\'=\'unique_id\'
)
- 解决方案:为表添加主键,或设置
-
binlog未启用:
ERROR: Binary logging is not enabled
- 解决方案:检查MySQL配置文件,确认
log-bin
已启用,重启MySQL服务
- 解决方案:检查MySQL配置文件,确认
通过以上步骤,可完成Flink MySQL CDC的环境配置与验证。生产环境中建议结合实际需求调整并行度、checkpoint策略和GTID配置,以确保数据一致性和系统稳定性。