> 技术文档 > Flink CDC yaml实现PG到Doris的数据同步,附样例_flinkcdc yaml配置文件详解

Flink CDC yaml实现PG到Doris的数据同步,附样例_flinkcdc yaml配置文件详解



以下是通过 Flink SQL + YAML 配置文件实现 PostgreSQL 到 Doris 实时同步的完整方案,包含架构设计、配置文件和详细操作步骤,适合新手快速上手:


一、整体架构

+---------------+ +----------------+ +---------------+ +---------------+| PostgreSQL | | Flink CDC | | Flink SQL | | Doris || (Source DB) | --CDC-> | (YAML+SQL配置) | --SQL-> | (实时ETL) | ----> | (Sink DB) |+---------------+ +----------------+ +---------------+ +---------------+
  • CDC 采集:通过 Flink CDC 3.0 的 postgres-cdc 连接器捕获变更数据
  • SQL 处理:使用 Flink SQL 定义 Source/Sink 并执行同步逻辑
  • YAML 配置:统一管理作业参数(并行度、检查点等)

二、环境准备

1. 软件版本
- Flink 1.16+- Flink CDC 3.0+- Doris 1.2+- PostgreSQL 10+
2. 依赖文件

将以下 JAR 文件放入 Flink 的 lib/ 目录:

  • flink-connector-postgres-cdc-3.0.0.jar
  • flink-doris-connector-1.2.0.jar

三、配置文件详解

1. Flink 全局配置 (flink-conf.yaml)
# 启用检查点(保证Exactly-Once语义)execution.checkpointing.interval: 5000execution.checkpointing.mode: EXACTLY_ONCE# 并行度配置parallelism.default: 2# 状态后端(推荐RocksDB)state.backend: rocksdbstate.checkpoints.dir: hdfs:///flink/checkpoints
2. 同步作业配置 (sync-job.yaml)
# 定义Source表tables: - name: pg_source type: source schema: - name: id type: BIGINT - name: name type: STRING - name: op type: STRING connector: type: postgres-cdc hostname: localhost port: 5432 username: postgres password: your_password database-name: your_db schema-name: public table-name: your_table slot.name: flink_slot# 定义Sink表 - name: doris_sink type: sink schema: - name: id type: BIGINT - name: name type: STRING - name: op_type type: STRING connector: type: doris fenodes: doris_fe:8030 username: admin password: doris_pass table.identifier: db.target_table# 同步逻辑sql: | INSERT INTO doris_sink SELECT id, name, CASE WHEN op = \'d\' THEN \'DELETE\' ELSE \'UPSERT\' END AS op_type FROM pg_source

四、核心实现步骤

1. 启动 Flink SQL Client
# 进入Flink安装目录./bin/sql-client.sh -i sync-job.yaml
2. 执行同步作业
-- 在SQL Client中提交作业SET \'pipeline.name\' = \'PostgreSQL-to-Doris-Sync\';EXECUTE STATEMENT SET BEGININSERT INTO doris_sink ...; -- 配置文件已定义,此处无需重复END;

五、关键配置解析

1. PostgreSQL CDC Source 配置
connector: type: postgres-cdc slot.name: flink_slot # 必须创建逻辑复制槽 decoding.plugin.name: pgoutput # PG解码插件
2. Doris Sink 配置
connector: sink.batch.size: 1000 # 批量写入条数 sink.batch.interval: 5000 # 批量间隔(ms) sink.max-retries: 3 # 写入重试次数
3. 数据处理逻辑
-- 将CDC的op字段转换为Doris可识别的操作类型CASE WHEN op = \'d\' THEN \'DELETE\' ELSE \'UPSERT\' END AS op_type

六、验证数据同步

1. 检查 Flink 作业状态
# 访问Flink Web UI (http://localhost:8081)# 查看 \"Running Jobs\" 是否有同步作业
2. Doris 数据验证
-- 查看数据写入情况SELECT * FROM db.target_table LIMIT 10;-- 检查删除操作同步DELETE FROM public.your_table WHERE id=100;SELECT * FROM db.target_table WHERE id=100; -- 应被标记为DELETE

七、常见问题处理

1. 逻辑复制槽冲突
# 删除残留的复制槽SELECT pg_drop_replication_slot(\'flink_slot\');
2. 字段类型不匹配
# 在schema中显式定义类型转换- name: create_time type: TIMESTAMP(3) from: created_at # 源表字段名
3. 数据延迟优化
# 调整并行度parallelism.default: 4# 增加Source配置connector: scan.startup.mode: latest-offset # 跳过历史数据

八、完整配置示例

目录结构
/flink-job/ ├── lib/ │ ├── flink-connector-postgres-cdc-3.0.0.jar │ └── flink-doris-connector-1.2.0.jar ├── conf/ │ └── flink-conf.yaml └── sync-job.yaml
运行命令
./bin/sql-client.sh \\ -i conf/sync-job.yaml \\ -D fs.hdfs.hadoopconf=/path/to/hadoop/conf

通过以上配置和代码,即可实现基于 YAML 的声明式数据同步。建议先在小数据量场景下测试,再逐步扩展到生产环境。