> 技术文档 > Flink CDC连接Oracle测试和logminer解读_oracle flinkcdc

Flink CDC连接Oracle测试和logminer解读_oracle flinkcdc

Flink CDC连接Oracle测试

1.测试目标

验证用户在 Oracle 数据库中被授予的权限是否能够支持 Flink CDC 正常执行数据导出任务,确保权限既满足功能需求,又符合最小权限原则。

2.测试环境

• Oracle 数据库版本:Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production

• Flink 版本:Flink 1.20.0

• Flink CDC 连接器版本:flink-sql-connector-oracle-cdc-3.3.0

• 操作系统:CentOS Linux release 7.2.1511

• 网络环境:千兆网卡

3.用户权限需求分析

根据 Flink CDC 官网对 Oracle 数据库的操作需求,推荐用户需要以下权限:

数据库连接权限

CREATE SESSION:允许用户登录到 Oracle 数据库。

SET CONTAINER:允许用户在多租户环境中切换到不同的可插拔数据库(PDB)。

查询数据权限

SELECT ANY TABLE:允许用户查询数据库中的任何表。

SELECT_CATALOG_ROLE:允许用户查询数据字典视图。

SELECT ANY TRANSACTION:允许用户查询任何事务的信息。

数据操作和维护权限

FLASHBACK ANY TABLE:允许用户对任何表执行恢复数据或查询历史数据操作。

ANALYZE ANY:允许用户对任何表或索引执行分析操作。

LOGMINING:允许用户使用 LogMiner 功能。

数据定义语言(DDL)权限

CREATE TABLE:允许用户创建表。

LOCK ANY TABLE:允许用户锁定任何表。

ALTER ANY TABLE:允许用户修改任何表的结构。

CREATE SEQUENCE:允许用户创建序列。

动态性能视图查询权限

SELECT ON V_$DATABASE:允许用户查询数据库的配置信息。

SELECT ON V_$LOG:允许用户查询重做日志信息。

SELECT ON V_$LOG_HISTORY:允许用户查询重做日志历史信息。

SELECT ON V_$LOGMNR_LOGS:允许用户查询 LogMiner 日志信息。

SELECT ON V_$LOGMNR_CONTENTS:允许用户查询 LogMiner 日志内容。

SELECT ON V_$LOGMNR_PARAMETERS:允许用户查询 LogMiner 参数信息。

SELECT ON V_$LOGFILE:允许用户查询重做日志文件信息。

SELECT ON V_$ARCHIVED_LOG:允许用户查询归档日志信息。

SELECT ON V_$ARCHIVE_DEST_STATUS:允许用户查询归档日志目标状态信息。

系统包执行权限

EXECUTE ON DBMS_LOGMNR:允许用户执行 LogMiner 相关操作。

EXECUTE ON DBMS_LOGMNR_D:允许用户执行 LogMiner 字典相关操作。

数据库改动

SUPPLEMENTAL LOG:启用数据库级别的最小补充日志记录,为使用 LogMiner 或其他基于日志的功能(如数据恢复和数据复制)提供必要的日志信息。

4.测试过程记录

1. Oracle创建flink cdc用户CREATE USER fcdcuser IDENTIFIED BY fcdcpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;2. 基本权限授予GRANT CREATE SESSION TO fcdcuser;GRANT SELECT ANY TABLE TO fcdcuser;3. 在Flink SQL中创建Oracle映射表CREATE TABLE cdctest (`ID` INT NOT NULL,`NAME` STRING,PRIMARY KEY (`ID`) NOT ENFORCED) WITH (\'connector\' = \'oracle-cdc\',\'hostname\' = \'192.168.100.***\',\'port\' = \'1521\',\'username\' = \'fcdcuser\',\'password\' = \'fcdcpw\',\'database-name\' = \'demo\',\'schema-name\' = \'FLINKUSER\',\'table-name\' = \'ORACLECDC\');4.查询cdctest映射表Flink SQL> select * from cdctest;5. 测试过程记录错误[ERROR] Could not execute SQL statement. Reason:Error : 1031, Position : 26, Sql = analyze table \"FLINKUSER\".\"ORACLECDC\" compute statistics for table, OriginalSql = analyze table \"FLINKUSER\".\"ORACLECDC\" compute statistics for table, Error Msg = ORA-01031: insufficient privileges授权:GRANT ANALYZE ANY TO fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:Error : 942, Position : 24, Sql = SELECT CURRENT_SCN FROM V$DATABASE, OriginalSql = SELECT CURRENT_SCN FROM V$DATABASE, Error Msg = ORA-00942: table or view does not exist授权:GRANT SELECT ON V_$DATABASE to fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:Error : 942, Position : 80, Sql = SELECT MIN(FIRST_CHANGE#) FROM (SELECT MIN(FIRST_CHANGE#) AS FIRST_CHANGE# FROM V$LOG UNION SELECT MIN(FIRST_CHANGE#) AS FIRST_CHANGE# FROM V$ARCHIVED_LOG WHERE DEST_ID IN (SELECT DEST_ID FROM V$ARCHIVE_DEST_STATUS WHERE STATUS=\'VALID\' AND TYPE=\'LOCAL\' AND ROWNUM=1) AND STATUS=\'A\'), OriginalSql = SELECT MIN(FIRST_CHANGE#) FROM (SELECT MIN(FIRST_CHANGE#) AS FIRST_CHANGE# FROM V$LOG UNION SELECT MIN(FIRST_CHANGE#) AS FIRST_CHANGE# FROM V$ARCHIVED_LOG WHERE DEST_ID IN (SELECT DEST_ID FROM V$ARCHIVE_DEST_STATUS WHERE STATUS=\'VALID\' AND TYPE=\'LOCAL\' AND ROWNUM=1) AND STATUS=\'A\'), Error Msg = ORA-00942: table or view does not exist授权:GRANT SELECT ON V_$LOG TO fcdcuser;GRANT SELECT ON V_$LOG_HISTORY TO fcdcuser;GRANT SELECT ON V_$ARCHIVED_LOG TO fcdcuser;GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:Error : 1031, Position : 0, Sql = CREATE TABLE LOG_MINING_FLUSH(LAST_SCN NUMBER(19,0)), OriginalSql = CREATE TABLE LOG_MINING_FLUSH(LAST_SCN NUMBER(19,0)), Error Msg = ORA-01031: insufficient privileges授权:GRANT CREATE TABLE TO fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:io.debezium.DebeziumException: Supplemental logging not properly configured. Use: ALTER DATABASE ADD SUPPLEMENTAL LOG DATA更改:ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;Fri Mar 28 09:53:51 2025ALTER DATABASE ADD SUPPLEMENTAL LOG DATASUPLOG: Previous supplemental logging attributes at scn = 159574141497SUPLOG: minimal = OFF, primary key = OFFSUPLOG: unique = OFF, foreign key = OFF, all column = OFFSUPLOG: procedural replication = OFFFri Mar 28 09:54:15 2025SUPLOG: New supplemental logging attributes at scn = 159574142455SUPLOG: minimal = ON, primary key = OFFSUPLOG: unique = OFF, foreign key = OFF, all column = OFFSUPLOG: procedural replication = OFFCompleted: ALTER DATABASE ADD SUPPLEMENTAL LOG DATA错误:[ERROR] Could not execute SQL statement. Reason:Error : 6550, Position : 38, Sql = BEGIN DBMS_LOGMNR_D.BUILD (options => DBMS_LOGMNR_D.STORE_IN_REDO_LOGS); END;, OriginalSql = BEGIN DBMS_LOGMNR_D.BUILD (options => DBMS_LOGMNR_D.STORE_IN_REDO_LOGS); END;, Error Msg = ORA-06550: line 1, column 39:PLS-00201: identifier \'DBMS_LOGMNR_D\' must be declaredORA-06550: line 1, column 7:PL/SQL: Statement ignored授权:GRANT EXECUTE ON DBMS_LOGMNR_D TO fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:Error : 942, Position : 29, Sql = SELECT FILENAME AS NAME FROM V$LOGMNR_LOGS, OriginalSql = SELECT FILENAME AS NAME FROM V$LOGMNR_LOGS, Error Msg = ORA-00942: table or view does not exist授权:GRANT SELECT ON V_$LOGMNR_LOGS TO fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:Error : 942, Position : 216, Sql = SELECT MIN(F.MEMBER) AS FILE_NAME, L.FIRST_CHANGE# FIRST_CHANGE, L.NEXT_CHANGE# NEXT_CHANGE, L.ARCHIVED, L.STATUS, \'ONLINE\' AS TYPE, L.SEQUENCE# AS SEQ, \'NO\' AS DICT_START, \'NO\' AS DICT_END, L.THREAD# AS THREAD FROM V$LOGFILE F, V$LOG L LEFT JOIN V$ARCHIVED_LOG A ON A.FIRST_CHANGE# = L.FIRST_CHANGE# AND A.NEXT_CHANGE# = L.NEXT_CHANGE# WHERE (A.STATUS  \'A\' OR A.FIRST_CHANGE# IS NULL) AND F.GROUP# = L.GROUP# GROUP BY F.GROUP#, L.FIRST_CHANGE#, L.NEXT_CHANGE#, L.STATUS, L.ARCHIVED, L.SEQUENCE#, L.THREAD# UNION SELECT A.NAME AS FILE_NAME, A.FIRST_CHANGE# FIRST_CHANGE, A.NEXT_CHANGE# NEXT_CHANGE, \'YES\', NULL, \'ARCHIVED\', A.SEQUENCE# AS SEQ, A.DICTIONARY_BEGIN, A.DICTIONARY_END, A.THREAD# AS THREAD FROM V$ARCHIVED_LOG A WHERE A.NAME IS NOT NULL AND A.ARCHIVED = \'YES\' AND A.STATUS = \'A\' AND A.NEXT_CHANGE# > 159574254187 AND A.DEST_ID IN (SELECT DEST_ID FROM V$ARCHIVE_DEST_STATUS WHERE STATUS=\'VALID\' AND TYPE=\'LOCAL\' AND ROWNUM=1) ORDER BY 7, OriginalSql = SELECT MIN(F.MEMBER) AS FILE_NAME, L.FIRST_CHANGE# FIRST_CHANGE, L.NEXT_CHANGE# NEXT_CHANGE, L.ARCHIVED, L.STATUS, \'ONLINE\' AS TYPE, L.SEQUENCE# AS SEQ, \'NO\' AS DICT_START, \'NO\' AS DICT_END, L.THREAD# AS THREAD FROM V$LOGFILE F, V$LOG L LEFT JOIN V$ARCHIVED_LOG A ON A.FIRST_CHANGE# = L.FIRST_CHANGE# AND A.NEXT_CHANGE# = L.NEXT_CHANGE# WHERE (A.STATUS  \'A\' OR A.FIRST_CHANGE# IS NULL) AND F.GROUP# = L.GROUP# GROUP BY F.GROUP#, L.FIRST_CHANGE#, L.NEXT_CHANGE#, L.STATUS, L.ARCHIVED, L.SEQUENCE#, L.THREAD# UNION SELECT A.NAME AS FILE_NAME, A.FIRST_CHANGE# FIRST_CHANGE, A.NEXT_CHANGE# NEXT_CHANGE, \'YES\', NULL, \'ARCHIVED\', A.SEQUENCE# AS SEQ, A.DICTIONARY_BEGIN, A.DICTIONARY_END, A.THREAD# AS THREAD FROM V$ARCHIVED_LOG A WHERE A.NAME IS NOT NULL AND A.ARCHIVED = \'YES\' AND A.STATUS = \'A\' AND A.NEXT_CHANGE# > 159574254187 AND A.DEST_ID IN (SELECT DEST_ID FROM V$ARCHIVE_DEST_STATUS WHERE STATUS=\'VALID\' AND TYPE=\'LOCAL\' AND ROWNUM=1) ORDER BY 7, Error Msg = ORA-00942: table or view does not exist授权:GRANT SELECT ON V_$LOGFILE TO fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:Error : 6550, Position : 152, Sql = BEGIN sys.dbms_logmnr.add_logfile(LOGFILENAME => \'/data/oradata/fast_recovery_area/DEMO/archivelog/2025_03_28/o1_mf_1_1323_myd1wh58_.arc\', OPTIONS => DBMS_LOGMNR.ADDFILE);END;, OriginalSql = BEGIN sys.dbms_logmnr.add_logfile(LOGFILENAME => \'/data/oradata/fast_recovery_area/DEMO/archivelog/2025_03_28/o1_mf_1_1323_myd1wh58_.arc\', OPTIONS => DBMS_LOGMNR.ADDFILE);END;, Error Msg = ORA-06550: line 1, column 153:PLS-00201: identifier \'DBMS_LOGMNR\' must be declaredORA-06550: line 1, column 7:PL/SQL: Statement ignored授权:GRANT EXECUTE ON DBMS_LOGMNR TO fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:Error : 942, Position : 32, Sql = SELECT VALUE FROM v$statname n, v$mystat m WHERE n.name=\'session uga memory\' AND n.statistic#=m.statistic#, OriginalSql = SELECT VALUE FROM v$statname n, v$mystat m WHERE n.name=\'session uga memory\' AND n.statistic#=m.statistic#, Error Msg = ORA-00942: table or view does not exist授权:GRANT SELECT ON V_$STATNAME TO FCDCUSER;GRANT SELECT ON V_$MYSTAT TO FCDCUSER;错误:[ERROR] Could not execute SQL statement. Reason:Error : 942, Position : 160, Sql = SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# FROM V$LOGMNR_CONTENTS WHERE SCN > :1 AND SCN <= :2 AND (SEG_OWNER IS NULL OR SEG_OWNER NOT IN (\'APPQOSSYS\',\'AUDSYS\',\'CTXSYS\',\'DVSYS\',\'DBSFWUSER\',\'DBSNMP\',\'GSMADMIN_INTERNAL\',\'LBACSYS\',\'MDSYS\',\'OJVMSYS\',\'OLAPSYS\',\'ORDDATA\',\'ORDSYS\',\'OUTLN\',\'SYS\',\'SYSTEM\',\'WMSYS\',\'XDB\')) AND ((OPERATION_CODE IN (6,7,34,36) OR (OPERATION_CODE = 5 AND USERNAME NOT IN (\'SYS\',\'SYSTEM\') AND INFO NOT LIKE \'INTERNAL DDL%\' AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE \'ORA_TEMP_%\')) ) OR (OPERATION_CODE IN (1,2,3,255) AND TABLE_NAME != \'LOG_MINING_FLUSH\' AND (REGEXP_LIKE(SEG_OWNER,\'^FLINKUSER$\',\'i\')) AND (REGEXP_LIKE(SEG_OWNER || \'.\' || TABLE_NAME,\'^FLINKUSER.ORACLECDC$\',\'i\')) )), OriginalSql = SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# FROM V$LOGMNR_CONTENTS WHERE SCN > ? AND SCN <= ? AND (SEG_OWNER IS NULL OR SEG_OWNER NOT IN (\'APPQOSSYS\',\'AUDSYS\',\'CTXSYS\',\'DVSYS\',\'DBSFWUSER\',\'DBSNMP\',\'GSMADMIN_INTERNAL\',\'LBACSYS\',\'MDSYS\',\'OJVMSYS\',\'OLAPSYS\',\'ORDDATA\',\'ORDSYS\',\'OUTLN\',\'SYS\',\'SYSTEM\',\'WMSYS\',\'XDB\')) AND ((OPERATION_CODE IN (6,7,34,36) OR (OPERATION_CODE = 5 AND USERNAME NOT IN (\'SYS\',\'SYSTEM\') AND INFO NOT LIKE \'INTERNAL DDL%\' AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE \'ORA_TEMP_%\')) ) OR (OPERATION_CODE IN (1,2,3,255) AND TABLE_NAME != \'LOG_MINING_FLUSH\' AND (REGEXP_LIKE(SEG_OWNER,\'^FLINKUSER$\',\'i\')) AND (REGEXP_LIKE(SEG_OWNER || \'.\' || TABLE_NAME,\'^FLINKUSER.ORACLECDC$\',\'i\')) )), Error Msg = ORA-00942: table or view does not exist授权:GRANT SELECT ON V_$LOGMNR_CONTENTS TO fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:Error : 1031, Position : 160, Sql = SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# FROM V$LOGMNR_CONTENTS WHERE SCN > :1 AND SCN <= :2 AND (SEG_OWNER IS NULL OR SEG_OWNER NOT IN (\'APPQOSSYS\',\'AUDSYS\',\'CTXSYS\',\'DVSYS\',\'DBSFWUSER\',\'DBSNMP\',\'GSMADMIN_INTERNAL\',\'LBACSYS\',\'MDSYS\',\'OJVMSYS\',\'OLAPSYS\',\'ORDDATA\',\'ORDSYS\',\'OUTLN\',\'SYS\',\'SYSTEM\',\'WMSYS\',\'XDB\')) AND ((OPERATION_CODE IN (6,7,34,36) OR (OPERATION_CODE = 5 AND USERNAME NOT IN (\'SYS\',\'SYSTEM\') AND INFO NOT LIKE \'INTERNAL DDL%\' AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE \'ORA_TEMP_%\')) ) OR (OPERATION_CODE IN (1,2,3,255) AND TABLE_NAME != \'LOG_MINING_FLUSH\' AND (REGEXP_LIKE(SEG_OWNER,\'^FLINKUSER$\',\'i\')) AND (REGEXP_LIKE(SEG_OWNER || \'.\' || TABLE_NAME,\'^FLINKUSER.ORACLECDC$\',\'i\')) )), OriginalSql = SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# FROM V$LOGMNR_CONTENTS WHERE SCN > ? AND SCN <= ? AND (SEG_OWNER IS NULL OR SEG_OWNER NOT IN (\'APPQOSSYS\',\'AUDSYS\',\'CTXSYS\',\'DVSYS\',\'DBSFWUSER\',\'DBSNMP\',\'GSMADMIN_INTERNAL\',\'LBACSYS\',\'MDSYS\',\'OJVMSYS\',\'OLAPSYS\',\'ORDDATA\',\'ORDSYS\',\'OUTLN\',\'SYS\',\'SYSTEM\',\'WMSYS\',\'XDB\')) AND ((OPERATION_CODE IN (6,7,34,36) OR (OPERATION_CODE = 5 AND USERNAME NOT IN (\'SYS\',\'SYSTEM\') AND INFO NOT LIKE \'INTERNAL DDL%\' AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE \'ORA_TEMP_%\')) ) OR (OPERATION_CODE IN (1,2,3,255) AND TABLE_NAME != \'LOG_MINING_FLUSH\' AND (REGEXP_LIKE(SEG_OWNER,\'^FLINKUSER$\',\'i\')) AND (REGEXP_LIKE(SEG_OWNER || \'.\' || TABLE_NAME,\'^FLINKUSER.ORACLECDC$\',\'i\')) )), Error Msg = ORA-01031: insufficient privileges授权:GRANT SELECT ON V_$LOGMNR_PARAMETERS TO fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:Error : 1031, Position : 160, Sql = SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# FROM V$LOGMNR_CONTENTS WHERE SCN > :1 AND SCN <= :2 AND (SEG_OWNER IS NULL OR SEG_OWNER NOT IN (\'APPQOSSYS\',\'AUDSYS\',\'CTXSYS\',\'DVSYS\',\'DBSFWUSER\',\'DBSNMP\',\'GSMADMIN_INTERNAL\',\'LBACSYS\',\'MDSYS\',\'OJVMSYS\',\'OLAPSYS\',\'ORDDATA\',\'ORDSYS\',\'OUTLN\',\'SYS\',\'SYSTEM\',\'WMSYS\',\'XDB\')) AND ((OPERATION_CODE IN (6,7,34,36) OR (OPERATION_CODE = 5 AND USERNAME NOT IN (\'SYS\',\'SYSTEM\') AND INFO NOT LIKE \'INTERNAL DDL%\' AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE \'ORA_TEMP_%\')) ) OR (OPERATION_CODE IN (1,2,3,255) AND TABLE_NAME != \'LOG_MINING_FLUSH\' AND (REGEXP_LIKE(SEG_OWNER,\'^FLINKUSER$\',\'i\')) AND (REGEXP_LIKE(SEG_OWNER || \'.\' || TABLE_NAME,\'^FLINKUSER.ORACLECDC$\',\'i\')) )), OriginalSql = SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# FROM V$LOGMNR_CONTENTS WHERE SCN > ? AND SCN <= ? AND (SEG_OWNER IS NULL OR SEG_OWNER NOT IN (\'APPQOSSYS\',\'AUDSYS\',\'CTXSYS\',\'DVSYS\',\'DBSFWUSER\',\'DBSNMP\',\'GSMADMIN_INTERNAL\',\'LBACSYS\',\'MDSYS\',\'OJVMSYS\',\'OLAPSYS\',\'ORDDATA\',\'ORDSYS\',\'OUTLN\',\'SYS\',\'SYSTEM\',\'WMSYS\',\'XDB\')) AND ((OPERATION_CODE IN (6,7,34,36) OR (OPERATION_CODE = 5 AND USERNAME NOT IN (\'SYS\',\'SYSTEM\') AND INFO NOT LIKE \'INTERNAL DDL%\' AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE \'ORA_TEMP_%\')) ) OR (OPERATION_CODE IN (1,2,3,255) AND TABLE_NAME != \'LOG_MINING_FLUSH\' AND (REGEXP_LIKE(SEG_OWNER,\'^FLINKUSER$\',\'i\')) AND (REGEXP_LIKE(SEG_OWNER || \'.\' || TABLE_NAME,\'^FLINKUSER.ORACLECDC$\',\'i\')) )), Error Msg = ORA-01031: insufficient privileges授权:GRANT SELECT_CATALOG_ROLE TO fcdcuser;GRANT EXECUTE_CATALOG_ROLE TO fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:Error : 1031, Position : 160, Sql = SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# FROM V$LOGMNR_CONTENTS WHERE SCN > :1 AND SCN <= :2 AND (SEG_OWNER IS NULL OR SEG_OWNER NOT IN (\'APPQOSSYS\',\'AUDSYS\',\'CTXSYS\',\'DVSYS\',\'DBSFWUSER\',\'DBSNMP\',\'GSMADMIN_INTERNAL\',\'LBACSYS\',\'MDSYS\',\'OJVMSYS\',\'OLAPSYS\',\'ORDDATA\',\'ORDSYS\',\'OUTLN\',\'SYS\',\'SYSTEM\',\'WMSYS\',\'XDB\')) AND ((OPERATION_CODE IN (6,7,34,36) OR (OPERATION_CODE = 5 AND USERNAME NOT IN (\'SYS\',\'SYSTEM\') AND INFO NOT LIKE \'INTERNAL DDL%\' AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE \'ORA_TEMP_%\')) ) OR (OPERATION_CODE IN (1,2,3,255) AND TABLE_NAME != \'LOG_MINING_FLUSH\' AND (REGEXP_LIKE(SEG_OWNER,\'^FLINKUSER$\',\'i\')) AND (REGEXP_LIKE(SEG_OWNER || \'.\' || TABLE_NAME,\'^FLINKUSER.ORACLECDC$\',\'i\')) )), OriginalSql = SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# FROM V$LOGMNR_CONTENTS WHERE SCN > ? AND SCN <= ? AND (SEG_OWNER IS NULL OR SEG_OWNER NOT IN (\'APPQOSSYS\',\'AUDSYS\',\'CTXSYS\',\'DVSYS\',\'DBSFWUSER\',\'DBSNMP\',\'GSMADMIN_INTERNAL\',\'LBACSYS\',\'MDSYS\',\'OJVMSYS\',\'OLAPSYS\',\'ORDDATA\',\'ORDSYS\',\'OUTLN\',\'SYS\',\'SYSTEM\',\'WMSYS\',\'XDB\')) AND ((OPERATION_CODE IN (6,7,34,36) OR (OPERATION_CODE = 5 AND USERNAME NOT IN (\'SYS\',\'SYSTEM\') AND INFO NOT LIKE \'INTERNAL DDL%\' AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE \'ORA_TEMP_%\')) ) OR (OPERATION_CODE IN (1,2,3,255) AND TABLE_NAME != \'LOG_MINING_FLUSH\' AND (REGEXP_LIKE(SEG_OWNER,\'^FLINKUSER$\',\'i\')) AND (REGEXP_LIKE(SEG_OWNER || \'.\' || TABLE_NAME,\'^FLINKUSER.ORACLECDC$\',\'i\')) )), Error Msg = ORA-01031: insufficient privileges授权:GRANT SELECT ANY TRANSACTION TO fcdcuser;

通过以上过程,成功访问Oracle表
Flink CDC连接Oracle测试和logminer解读_oracle flinkcdc

5.测试结果

oracle flink连接用户仅授予以下权限:ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;GRANT CREATE SESSION TO fcdcuser;GRANT SELECT ON V_$DATABASE to fcdcuser;GRANT SELECT ANY TABLE TO fcdcuser;GRANT SELECT_CATALOG_ROLE TO fcdcuser;GRANT EXECUTE_CATALOG_ROLE TO fcdcuser;GRANT SELECT ANY TRANSACTION TO fcdcuser;GRANT ANALYZE ANY TO fcdcuser;GRANT CREATE TABLE TO fcdcuser;GRANT CREATE SEQUENCE TO fcdcuser;GRANT EXECUTE ON DBMS_LOGMNR TO fcdcuser;GRANT EXECUTE ON DBMS_LOGMNR_D TO fcdcuser;GRANT SELECT ON V_$LOG TO fcdcuser;GRANT SELECT ON V_$LOG_HISTORY TO fcdcuser;GRANT SELECT ON V_$LOGMNR_LOGS TO fcdcuser;GRANT SELECT ON V_$LOGMNR_CONTENTS TO fcdcuser;GRANT SELECT ON V_$LOGMNR_PARAMETERS TO fcdcuser;GRANT SELECT ON V_$LOGFILE TO fcdcuser;GRANT SELECT ON V_$ARCHIVED_LOG TO fcdcuser;GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO fcdcuser;

实际测试过程

2025-03-31

Caused by: Error : 1031, Position : 18, Sql = LOCK TABLE \"CCXE\".\"PUB_ORG_INFO\" IN ROW SHARE MODE, OriginalSql = LOCK TABLE \"CCXE\".\"PUB_ORG_INFO\" IN ROW SHARE MODE, Error Msg = ORA-01031: 权限不足

GRANT LOCK ANY TABLE TO flinkuser;

1、归档(补充)日志的作用:归档日志记录了数据库的所有事务更改,是数据库恢复和数据同步的重要依据。在数据同步过程中,同步工具(如Flink CDC)通过解析归档日志来获取自上次同步以来的增量数据,从而实现数据的实时同步。
2、清理归档日志后的影响:如果归档日志只保存7天,而Flink CDC运行脚本暂停了10天再启动,那么在这10天内产生的归档日志中,有3天的归档日志已经被清理掉。由于这些日志记录了这3天内的数据变更信息,而同步工具无法获取这部分缺失的日志内容,因此无法通过常规的增量同步方式追加同步这3天内的数据变更。
3、从以下日志中可以总结出,Logminer会依次解析归档日志文件和当前重做日志文件,所以如果Flink CDC运行脚本暂停了10天再启动,会重新去读取一遍归档日志,然后切换到redo日志来进行读取。
Flink CDC连接Oracle测试和logminer解读_oracle flinkcdc

查询数据库级别的补充日志状态

SELECT supplemental_log_data_min,
supplemental_log_data_pk,
supplemental_log_data_ui,
supplemental_log_data_fk,
supplemental_log_data_all
FROM v$database;

查询表级别的补充日志状态

select * from dba_log_groups

删除主键附加日志

SQL> ALTER TABLE FLINKUSER.PUB_ORG_INFO_BAO DROP SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;

Table altered.

添加主键附加日志

SQL> ALTER TABLE FLINKUSER.PUB_ORG_INFO_BAO ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;

Table altered.

补充日志组

ALTER TABLE your_table ADD SUPPLEMENTAL LOG GROUP log_group_unique_cols (unique_column1, unique_column2) ALWAYS;

SQL> ALTER TABLE FLINKUSER.PUB_ORG_INFO_BAO DROP SUPPLEMENTAL LOG GROUP log_group_unique_cols;