> 技术文档 > Flink MySQL CDC 环境配置与验证_flink cdc mysql

Flink MySQL CDC 环境配置与验证_flink cdc mysql


一、MySQL 服务器配置详解
1. 启用二进制日志(Binlog)

MySQL CDC 依赖二进制日志获取增量数据,需在 MySQL 配置文件(my.cnfmy.ini)中添加以下配置:

# 启用二进制日志log-bin=mysql-bin# 二进制日志格式(推荐ROW模式,记录行级变更)binlog-format=ROW# 启用GTID(高可用必备)gtid-mode=ONenforce-gtid-consistency=ON# 从库同步时记录binlog(主从架构需要)log-slave-updates=ON# 避免长连接超时(大表快照时需要)interactive_timeout=3600wait_timeout=3600

配置说明

  • log-bin:指定二进制日志文件名前缀,MySQL 会自动生成如 mysql-bin.000001 的文件
  • binlog-format=ROW:相比 STATEMENT 模式,ROW 模式能精确记录每行数据的变更
  • gtid-mode:全局事务标识符,用于主从切换时保证数据一致性
  • log-slave-updates:若使用从库同步,需开启此配置让从库也记录 binlog
2. 创建专用用户并授权
-- 创建用户(替换为实际用户名和密码)CREATE USER \'flink_cdc\'@\'localhost\' IDENTIFIED BY \'flink123\';-- 授予必要权限(重要:REPLICATION SLAVE 用于读取binlog)GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO \'flink_cdc\'@\'localhost\';-- 刷新权限FLUSH PRIVILEGES;

权限说明

  • SELECT:读取表数据(快照阶段需要)
  • SHOW DATABASES:获取数据库列表(用于正则匹配监控库)
  • REPLICATION SLAVE:读取 binlog 必备权限
  • REPLICATION CLIENT:获取服务器状态(如binlog位置)
3. 配置唯一 Server ID

每个 Flink 作业需配置不同的 Server ID(避免 binlog 位置冲突):

# 在my.cnf中添加server-id=1001 # 任意唯一整数,建议范围5400-6400

说明:若 Flink 作业并行度为 N,则 Server ID 可设为范围(如 5400-5400+N),例如:

-- Flink SQL 中通过Hints设置Server ID范围SELECT * FROM mysql_table /*+ OPTIONS(\'server-id\'=\'5401-5404\') */;
二、Flink 环境配置步骤
1. 添加依赖(Maven 项目)

pom.xml 中添加 MySQL CDC 连接器依赖:

<dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-mysql-cdc</artifactId> <version>3.0.1</version>  <scope>provided</scope></dependency>
2. SQL Client 部署(非Maven环境)
  1. 下载连接器 JAR 包:flink-sql-connector-mysql-cdc-3.0.1.jar
  2. 将 JAR 包放入 $FLINK_HOME/lib/ 目录
  3. 重启 Flink 集群使依赖生效
三、Flink MySQL CDC 表定义与参数详解
1. 完整建表示例(Flink SQL)
-- 设置checkpoint间隔(可选)SET \'execution.checkpointing.interval\' = \'3s\';-- 创建MySQL CDC表CREATE TABLE mysql_orders ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, -- 可选:添加元数据列 db_name STRING METADATA FROM \'database_name\' VIRTUAL, table_name STRING METADATA FROM \'table_name\' VIRTUAL, op_ts TIMESTAMP_LTZ(3) METADATA FROM \'op_ts\' VIRTUAL, row_kind STRING METADATA FROM \'row_kind\' VIRTUAL, PRIMARY KEY(order_id) NOT ENFORCED) WITH ( \'connector\' = \'mysql-cdc\', \'hostname\' = \'192.168.1.100\', \'port\' = \'3306\', \'username\' = \'flink_cdc\', \'password\' = \'flink123\', \'database-name\' = \'mydb\', \'table-name\' = \'orders\', -- 可选参数详解 \'server-id\' = \'5401\', \'scan.incremental.snapshot.enabled\' = \'true\', \'scan.incremental.snapshot.chunk.size\' = \'8096\', \'scan.startup.mode\' = \'initial\', \'heartbeat.interval\' = \'30s\', \'debezium.binary.handling.mode\' = \'base64\');
2. 核心参数详解
参数名 必选 默认值 类型 说明 connector 是 无 String 固定为 mysql-cdc hostname 是 无 String MySQL 服务器IP或域名 username 是 无 String 连接MySQL的用户名 password 是 无 String 连接MySQL的密码 database-name 是 无 String 监控的数据库名,支持正则表达式(如 ^(test).* 匹配以test开头的库) table-name 是 无 String 监控的表名,支持正则表达式(如 `orders server-id 否 5400-6400随机 String Flink作业的唯一标识,需与其他MySQL客户端(如主从复制)不同,并行作业建议设为范围(如 5401-5404scan.incremental.snapshot.enabled 否 true Boolean 启用增量快照(并行读取大表,无需全局锁),建议保持默认 scan.startup.mode 否 initial String 启动模式:initial(快照+binlog)、earliest-offset(从最早binlog开始)、latest-offset(从最新binlog开始) heartbeat.interval 否 30s Duration 心跳间隔,用于更新binlog位置,避免长时间无变更时binlog被清理 debezium.binary.handling.mode 否 none String 二进制数据处理模式:base64(转Base64字符串)、hex(转十六进制),适用于BLOB/VARBINARY类型
四、环境验证与测试
1. 准备测试数据(MySQL)
-- 创建测试数据库和表CREATE DATABASE mydb;USE mydb;CREATE TABLE orders ( order_id INT PRIMARY KEY, order_date TIMESTAMP, customer_name VARCHAR(100), price DECIMAL(10, 2), order_status BOOLEAN);-- 插入测试数据INSERT INTO orders VALUES (1, \'2023-01-01 10:00:00\', \'Alice\', 100.50, true),(2, \'2023-01-02 11:00:00\', \'Bob\', 200.75, false);
2. 使用Flink SQL验证
-- 查询MySQL CDC表数据SELECT * FROM mysql_orders;-- 观察输出:应显示插入的两条记录-- 后续在MySQL中更新数据,Flink会实时捕获变更UPDATE mydb.orders SET price = 150.00 WHERE order_id = 1;
3. DataStream API 验证示例
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;public class MySqlCdcExample { public static void main(String[] args) throws Exception { // 创建MySQL Source MySqlSource<String> source = MySqlSource.<String>builder() .hostname(\"192.168.1.100\") .port(3306) .databaseList(\"mydb\") .tableList(\"mydb.orders\") .username(\"flink_cdc\") .password(\"flink123\") .deserializer(new JsonDebeziumDeserializationSchema()) // 转为JSON格式 .startupOptions(StartupOptions.initial()) // 初始模式(快照+binlog) .build(); // 配置Flink环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // 5秒checkpoint env.fromSource(source, WatermarkStrategy.noWatermarks(), \"MySQL CDC Source\") .print(); // 打印到控制台 env.execute(\"MySQL CDC Test\"); }}
4. 验证关键点
  1. 日志检查

    • Flink 日志应包含 Binlog offset on checkpoint 字样,表明成功获取 binlog 位置
    • Access deniedPermission denied 错误,确认MySQL权限正确
  2. 数据变更测试

    • 在MySQL中执行 INSERT/UPDATE/DELETE 操作,Flink 应实时输出变更数据
    • 查看输出中的 row_kind 字段:+I(插入)、-D(删除)、+U(更新后)、-U(更新前)
  3. 增量快照验证

    • 若表数据量大,查看Flink Web UI的并行度,增量快照模式下多个任务应并行读取
    • 日志中无 FLUSH TABLES WITH READ LOCK 相关记录,确认未获取全局锁
五、常见问题与解决方案
  1. 权限不足错误

    ERROR: Access denied for user \'flink_cdc\'@\'localhost\' (using password: YES)
    • 解决方案:确认MySQL用户密码正确,重新执行授权语句,确保包含 REPLICATION SLAVE 权限
  2. Server ID冲突

    ERROR: Another MySQL binlog client is using the same server id
    • 解决方案:修改 server-id 为唯一值,或在Flink SQL中通过 \'server-id\'=\'5401-5404\' 设置范围
  3. 增量快照失败

    ERROR: Table has no primary key, cannot split snapshot chunks
    • 解决方案:为表添加主键,或设置 scan.incremental.snapshot.chunk.key-column 为非空列(如 \'scan.incremental.snapshot.chunk.key-column\'=\'unique_id\'
  4. binlog未启用

    ERROR: Binary logging is not enabled
    • 解决方案:检查MySQL配置文件,确认 log-bin 已启用,重启MySQL服务

通过以上步骤,可完成Flink MySQL CDC的环境配置与验证。生产环境中建议结合实际需求调整并行度、checkpoint策略和GTID配置,以确保数据一致性和系统稳定性。