Flink CDC连接Oracle测试和logminer解读_oracle flinkcdc
Flink CDC连接Oracle测试
1.测试目标
验证用户在 Oracle 数据库中被授予的权限是否能够支持 Flink CDC 正常执行数据导出任务,确保权限既满足功能需求,又符合最小权限原则。
2.测试环境
• Oracle 数据库版本:Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production
• Flink 版本:Flink 1.20.0
• Flink CDC 连接器版本:flink-sql-connector-oracle-cdc-3.3.0
• 操作系统:CentOS Linux release 7.2.1511
• 网络环境:千兆网卡
3.用户权限需求分析
根据 Flink CDC 官网对 Oracle 数据库的操作需求,推荐用户需要以下权限:
数据库连接权限
• CREATE SESSION
:允许用户登录到 Oracle 数据库。
• SET CONTAINER
:允许用户在多租户环境中切换到不同的可插拔数据库(PDB)。
查询数据权限
• SELECT ANY TABLE
:允许用户查询数据库中的任何表。
• SELECT_CATALOG_ROLE
:允许用户查询数据字典视图。
• SELECT ANY TRANSACTION
:允许用户查询任何事务的信息。
数据操作和维护权限
• FLASHBACK ANY TABLE
:允许用户对任何表执行恢复数据或查询历史数据操作。
• ANALYZE ANY
:允许用户对任何表或索引执行分析操作。
• LOGMINING
:允许用户使用 LogMiner 功能。
数据定义语言(DDL)权限
• CREATE TABLE
:允许用户创建表。
• LOCK ANY TABLE
:允许用户锁定任何表。
• ALTER ANY TABLE
:允许用户修改任何表的结构。
• CREATE SEQUENCE
:允许用户创建序列。
动态性能视图查询权限
• SELECT ON V_$DATABASE
:允许用户查询数据库的配置信息。
• SELECT ON V_$LOG
:允许用户查询重做日志信息。
• SELECT ON V_$LOG_HISTORY
:允许用户查询重做日志历史信息。
• SELECT ON V_$LOGMNR_LOGS
:允许用户查询 LogMiner 日志信息。
• SELECT ON V_$LOGMNR_CONTENTS
:允许用户查询 LogMiner 日志内容。
• SELECT ON V_$LOGMNR_PARAMETERS
:允许用户查询 LogMiner 参数信息。
• SELECT ON V_$LOGFILE
:允许用户查询重做日志文件信息。
• SELECT ON V_$ARCHIVED_LOG
:允许用户查询归档日志信息。
• SELECT ON V_$ARCHIVE_DEST_STATUS
:允许用户查询归档日志目标状态信息。
系统包执行权限
• EXECUTE ON DBMS_LOGMNR
:允许用户执行 LogMiner 相关操作。
• EXECUTE ON DBMS_LOGMNR_D
:允许用户执行 LogMiner 字典相关操作。
数据库改动
• SUPPLEMENTAL LOG
:启用数据库级别的最小补充日志记录,为使用 LogMiner 或其他基于日志的功能(如数据恢复和数据复制)提供必要的日志信息。
4.测试过程记录
1. Oracle创建flink cdc用户CREATE USER fcdcuser IDENTIFIED BY fcdcpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;2. 基本权限授予GRANT CREATE SESSION TO fcdcuser;GRANT SELECT ANY TABLE TO fcdcuser;3. 在Flink SQL中创建Oracle映射表CREATE TABLE cdctest (`ID` INT NOT NULL,`NAME` STRING,PRIMARY KEY (`ID`) NOT ENFORCED) WITH (\'connector\' = \'oracle-cdc\',\'hostname\' = \'192.168.100.***\',\'port\' = \'1521\',\'username\' = \'fcdcuser\',\'password\' = \'fcdcpw\',\'database-name\' = \'demo\',\'schema-name\' = \'FLINKUSER\',\'table-name\' = \'ORACLECDC\');4.查询cdctest映射表Flink SQL> select * from cdctest;5. 测试过程记录错误:[ERROR] Could not execute SQL statement. Reason:Error : 1031, Position : 26, Sql = analyze table \"FLINKUSER\".\"ORACLECDC\" compute statistics for table, OriginalSql = analyze table \"FLINKUSER\".\"ORACLECDC\" compute statistics for table, Error Msg = ORA-01031: insufficient privileges授权:GRANT ANALYZE ANY TO fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:Error : 942, Position : 24, Sql = SELECT CURRENT_SCN FROM V$DATABASE, OriginalSql = SELECT CURRENT_SCN FROM V$DATABASE, Error Msg = ORA-00942: table or view does not exist授权:GRANT SELECT ON V_$DATABASE to fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:Error : 942, Position : 80, Sql = SELECT MIN(FIRST_CHANGE#) FROM (SELECT MIN(FIRST_CHANGE#) AS FIRST_CHANGE# FROM V$LOG UNION SELECT MIN(FIRST_CHANGE#) AS FIRST_CHANGE# FROM V$ARCHIVED_LOG WHERE DEST_ID IN (SELECT DEST_ID FROM V$ARCHIVE_DEST_STATUS WHERE STATUS=\'VALID\' AND TYPE=\'LOCAL\' AND ROWNUM=1) AND STATUS=\'A\'), OriginalSql = SELECT MIN(FIRST_CHANGE#) FROM (SELECT MIN(FIRST_CHANGE#) AS FIRST_CHANGE# FROM V$LOG UNION SELECT MIN(FIRST_CHANGE#) AS FIRST_CHANGE# FROM V$ARCHIVED_LOG WHERE DEST_ID IN (SELECT DEST_ID FROM V$ARCHIVE_DEST_STATUS WHERE STATUS=\'VALID\' AND TYPE=\'LOCAL\' AND ROWNUM=1) AND STATUS=\'A\'), Error Msg = ORA-00942: table or view does not exist授权:GRANT SELECT ON V_$LOG TO fcdcuser;GRANT SELECT ON V_$LOG_HISTORY TO fcdcuser;GRANT SELECT ON V_$ARCHIVED_LOG TO fcdcuser;GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:Error : 1031, Position : 0, Sql = CREATE TABLE LOG_MINING_FLUSH(LAST_SCN NUMBER(19,0)), OriginalSql = CREATE TABLE LOG_MINING_FLUSH(LAST_SCN NUMBER(19,0)), Error Msg = ORA-01031: insufficient privileges授权:GRANT CREATE TABLE TO fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:io.debezium.DebeziumException: Supplemental logging not properly configured. Use: ALTER DATABASE ADD SUPPLEMENTAL LOG DATA更改:ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;Fri Mar 28 09:53:51 2025ALTER DATABASE ADD SUPPLEMENTAL LOG DATASUPLOG: Previous supplemental logging attributes at scn = 159574141497SUPLOG: minimal = OFF, primary key = OFFSUPLOG: unique = OFF, foreign key = OFF, all column = OFFSUPLOG: procedural replication = OFFFri Mar 28 09:54:15 2025SUPLOG: New supplemental logging attributes at scn = 159574142455SUPLOG: minimal = ON, primary key = OFFSUPLOG: unique = OFF, foreign key = OFF, all column = OFFSUPLOG: procedural replication = OFFCompleted: ALTER DATABASE ADD SUPPLEMENTAL LOG DATA错误:[ERROR] Could not execute SQL statement. Reason:Error : 6550, Position : 38, Sql = BEGIN DBMS_LOGMNR_D.BUILD (options => DBMS_LOGMNR_D.STORE_IN_REDO_LOGS); END;, OriginalSql = BEGIN DBMS_LOGMNR_D.BUILD (options => DBMS_LOGMNR_D.STORE_IN_REDO_LOGS); END;, Error Msg = ORA-06550: line 1, column 39:PLS-00201: identifier \'DBMS_LOGMNR_D\' must be declaredORA-06550: line 1, column 7:PL/SQL: Statement ignored授权:GRANT EXECUTE ON DBMS_LOGMNR_D TO fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:Error : 942, Position : 29, Sql = SELECT FILENAME AS NAME FROM V$LOGMNR_LOGS, OriginalSql = SELECT FILENAME AS NAME FROM V$LOGMNR_LOGS, Error Msg = ORA-00942: table or view does not exist授权:GRANT SELECT ON V_$LOGMNR_LOGS TO fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:Error : 942, Position : 216, Sql = SELECT MIN(F.MEMBER) AS FILE_NAME, L.FIRST_CHANGE# FIRST_CHANGE, L.NEXT_CHANGE# NEXT_CHANGE, L.ARCHIVED, L.STATUS, \'ONLINE\' AS TYPE, L.SEQUENCE# AS SEQ, \'NO\' AS DICT_START, \'NO\' AS DICT_END, L.THREAD# AS THREAD FROM V$LOGFILE F, V$LOG L LEFT JOIN V$ARCHIVED_LOG A ON A.FIRST_CHANGE# = L.FIRST_CHANGE# AND A.NEXT_CHANGE# = L.NEXT_CHANGE# WHERE (A.STATUS \'A\' OR A.FIRST_CHANGE# IS NULL) AND F.GROUP# = L.GROUP# GROUP BY F.GROUP#, L.FIRST_CHANGE#, L.NEXT_CHANGE#, L.STATUS, L.ARCHIVED, L.SEQUENCE#, L.THREAD# UNION SELECT A.NAME AS FILE_NAME, A.FIRST_CHANGE# FIRST_CHANGE, A.NEXT_CHANGE# NEXT_CHANGE, \'YES\', NULL, \'ARCHIVED\', A.SEQUENCE# AS SEQ, A.DICTIONARY_BEGIN, A.DICTIONARY_END, A.THREAD# AS THREAD FROM V$ARCHIVED_LOG A WHERE A.NAME IS NOT NULL AND A.ARCHIVED = \'YES\' AND A.STATUS = \'A\' AND A.NEXT_CHANGE# > 159574254187 AND A.DEST_ID IN (SELECT DEST_ID FROM V$ARCHIVE_DEST_STATUS WHERE STATUS=\'VALID\' AND TYPE=\'LOCAL\' AND ROWNUM=1) ORDER BY 7, OriginalSql = SELECT MIN(F.MEMBER) AS FILE_NAME, L.FIRST_CHANGE# FIRST_CHANGE, L.NEXT_CHANGE# NEXT_CHANGE, L.ARCHIVED, L.STATUS, \'ONLINE\' AS TYPE, L.SEQUENCE# AS SEQ, \'NO\' AS DICT_START, \'NO\' AS DICT_END, L.THREAD# AS THREAD FROM V$LOGFILE F, V$LOG L LEFT JOIN V$ARCHIVED_LOG A ON A.FIRST_CHANGE# = L.FIRST_CHANGE# AND A.NEXT_CHANGE# = L.NEXT_CHANGE# WHERE (A.STATUS \'A\' OR A.FIRST_CHANGE# IS NULL) AND F.GROUP# = L.GROUP# GROUP BY F.GROUP#, L.FIRST_CHANGE#, L.NEXT_CHANGE#, L.STATUS, L.ARCHIVED, L.SEQUENCE#, L.THREAD# UNION SELECT A.NAME AS FILE_NAME, A.FIRST_CHANGE# FIRST_CHANGE, A.NEXT_CHANGE# NEXT_CHANGE, \'YES\', NULL, \'ARCHIVED\', A.SEQUENCE# AS SEQ, A.DICTIONARY_BEGIN, A.DICTIONARY_END, A.THREAD# AS THREAD FROM V$ARCHIVED_LOG A WHERE A.NAME IS NOT NULL AND A.ARCHIVED = \'YES\' AND A.STATUS = \'A\' AND A.NEXT_CHANGE# > 159574254187 AND A.DEST_ID IN (SELECT DEST_ID FROM V$ARCHIVE_DEST_STATUS WHERE STATUS=\'VALID\' AND TYPE=\'LOCAL\' AND ROWNUM=1) ORDER BY 7, Error Msg = ORA-00942: table or view does not exist授权:GRANT SELECT ON V_$LOGFILE TO fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:Error : 6550, Position : 152, Sql = BEGIN sys.dbms_logmnr.add_logfile(LOGFILENAME => \'/data/oradata/fast_recovery_area/DEMO/archivelog/2025_03_28/o1_mf_1_1323_myd1wh58_.arc\', OPTIONS => DBMS_LOGMNR.ADDFILE);END;, OriginalSql = BEGIN sys.dbms_logmnr.add_logfile(LOGFILENAME => \'/data/oradata/fast_recovery_area/DEMO/archivelog/2025_03_28/o1_mf_1_1323_myd1wh58_.arc\', OPTIONS => DBMS_LOGMNR.ADDFILE);END;, Error Msg = ORA-06550: line 1, column 153:PLS-00201: identifier \'DBMS_LOGMNR\' must be declaredORA-06550: line 1, column 7:PL/SQL: Statement ignored授权:GRANT EXECUTE ON DBMS_LOGMNR TO fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:Error : 942, Position : 32, Sql = SELECT VALUE FROM v$statname n, v$mystat m WHERE n.name=\'session uga memory\' AND n.statistic#=m.statistic#, OriginalSql = SELECT VALUE FROM v$statname n, v$mystat m WHERE n.name=\'session uga memory\' AND n.statistic#=m.statistic#, Error Msg = ORA-00942: table or view does not exist授权:GRANT SELECT ON V_$STATNAME TO FCDCUSER;GRANT SELECT ON V_$MYSTAT TO FCDCUSER;错误:[ERROR] Could not execute SQL statement. Reason:Error : 942, Position : 160, Sql = SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# FROM V$LOGMNR_CONTENTS WHERE SCN > :1 AND SCN <= :2 AND (SEG_OWNER IS NULL OR SEG_OWNER NOT IN (\'APPQOSSYS\',\'AUDSYS\',\'CTXSYS\',\'DVSYS\',\'DBSFWUSER\',\'DBSNMP\',\'GSMADMIN_INTERNAL\',\'LBACSYS\',\'MDSYS\',\'OJVMSYS\',\'OLAPSYS\',\'ORDDATA\',\'ORDSYS\',\'OUTLN\',\'SYS\',\'SYSTEM\',\'WMSYS\',\'XDB\')) AND ((OPERATION_CODE IN (6,7,34,36) OR (OPERATION_CODE = 5 AND USERNAME NOT IN (\'SYS\',\'SYSTEM\') AND INFO NOT LIKE \'INTERNAL DDL%\' AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE \'ORA_TEMP_%\')) ) OR (OPERATION_CODE IN (1,2,3,255) AND TABLE_NAME != \'LOG_MINING_FLUSH\' AND (REGEXP_LIKE(SEG_OWNER,\'^FLINKUSER$\',\'i\')) AND (REGEXP_LIKE(SEG_OWNER || \'.\' || TABLE_NAME,\'^FLINKUSER.ORACLECDC$\',\'i\')) )), OriginalSql = SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# FROM V$LOGMNR_CONTENTS WHERE SCN > ? AND SCN <= ? AND (SEG_OWNER IS NULL OR SEG_OWNER NOT IN (\'APPQOSSYS\',\'AUDSYS\',\'CTXSYS\',\'DVSYS\',\'DBSFWUSER\',\'DBSNMP\',\'GSMADMIN_INTERNAL\',\'LBACSYS\',\'MDSYS\',\'OJVMSYS\',\'OLAPSYS\',\'ORDDATA\',\'ORDSYS\',\'OUTLN\',\'SYS\',\'SYSTEM\',\'WMSYS\',\'XDB\')) AND ((OPERATION_CODE IN (6,7,34,36) OR (OPERATION_CODE = 5 AND USERNAME NOT IN (\'SYS\',\'SYSTEM\') AND INFO NOT LIKE \'INTERNAL DDL%\' AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE \'ORA_TEMP_%\')) ) OR (OPERATION_CODE IN (1,2,3,255) AND TABLE_NAME != \'LOG_MINING_FLUSH\' AND (REGEXP_LIKE(SEG_OWNER,\'^FLINKUSER$\',\'i\')) AND (REGEXP_LIKE(SEG_OWNER || \'.\' || TABLE_NAME,\'^FLINKUSER.ORACLECDC$\',\'i\')) )), Error Msg = ORA-00942: table or view does not exist授权:GRANT SELECT ON V_$LOGMNR_CONTENTS TO fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:Error : 1031, Position : 160, Sql = SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# FROM V$LOGMNR_CONTENTS WHERE SCN > :1 AND SCN <= :2 AND (SEG_OWNER IS NULL OR SEG_OWNER NOT IN (\'APPQOSSYS\',\'AUDSYS\',\'CTXSYS\',\'DVSYS\',\'DBSFWUSER\',\'DBSNMP\',\'GSMADMIN_INTERNAL\',\'LBACSYS\',\'MDSYS\',\'OJVMSYS\',\'OLAPSYS\',\'ORDDATA\',\'ORDSYS\',\'OUTLN\',\'SYS\',\'SYSTEM\',\'WMSYS\',\'XDB\')) AND ((OPERATION_CODE IN (6,7,34,36) OR (OPERATION_CODE = 5 AND USERNAME NOT IN (\'SYS\',\'SYSTEM\') AND INFO NOT LIKE \'INTERNAL DDL%\' AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE \'ORA_TEMP_%\')) ) OR (OPERATION_CODE IN (1,2,3,255) AND TABLE_NAME != \'LOG_MINING_FLUSH\' AND (REGEXP_LIKE(SEG_OWNER,\'^FLINKUSER$\',\'i\')) AND (REGEXP_LIKE(SEG_OWNER || \'.\' || TABLE_NAME,\'^FLINKUSER.ORACLECDC$\',\'i\')) )), OriginalSql = SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# FROM V$LOGMNR_CONTENTS WHERE SCN > ? AND SCN <= ? AND (SEG_OWNER IS NULL OR SEG_OWNER NOT IN (\'APPQOSSYS\',\'AUDSYS\',\'CTXSYS\',\'DVSYS\',\'DBSFWUSER\',\'DBSNMP\',\'GSMADMIN_INTERNAL\',\'LBACSYS\',\'MDSYS\',\'OJVMSYS\',\'OLAPSYS\',\'ORDDATA\',\'ORDSYS\',\'OUTLN\',\'SYS\',\'SYSTEM\',\'WMSYS\',\'XDB\')) AND ((OPERATION_CODE IN (6,7,34,36) OR (OPERATION_CODE = 5 AND USERNAME NOT IN (\'SYS\',\'SYSTEM\') AND INFO NOT LIKE \'INTERNAL DDL%\' AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE \'ORA_TEMP_%\')) ) OR (OPERATION_CODE IN (1,2,3,255) AND TABLE_NAME != \'LOG_MINING_FLUSH\' AND (REGEXP_LIKE(SEG_OWNER,\'^FLINKUSER$\',\'i\')) AND (REGEXP_LIKE(SEG_OWNER || \'.\' || TABLE_NAME,\'^FLINKUSER.ORACLECDC$\',\'i\')) )), Error Msg = ORA-01031: insufficient privileges授权:GRANT SELECT ON V_$LOGMNR_PARAMETERS TO fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:Error : 1031, Position : 160, Sql = SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# FROM V$LOGMNR_CONTENTS WHERE SCN > :1 AND SCN <= :2 AND (SEG_OWNER IS NULL OR SEG_OWNER NOT IN (\'APPQOSSYS\',\'AUDSYS\',\'CTXSYS\',\'DVSYS\',\'DBSFWUSER\',\'DBSNMP\',\'GSMADMIN_INTERNAL\',\'LBACSYS\',\'MDSYS\',\'OJVMSYS\',\'OLAPSYS\',\'ORDDATA\',\'ORDSYS\',\'OUTLN\',\'SYS\',\'SYSTEM\',\'WMSYS\',\'XDB\')) AND ((OPERATION_CODE IN (6,7,34,36) OR (OPERATION_CODE = 5 AND USERNAME NOT IN (\'SYS\',\'SYSTEM\') AND INFO NOT LIKE \'INTERNAL DDL%\' AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE \'ORA_TEMP_%\')) ) OR (OPERATION_CODE IN (1,2,3,255) AND TABLE_NAME != \'LOG_MINING_FLUSH\' AND (REGEXP_LIKE(SEG_OWNER,\'^FLINKUSER$\',\'i\')) AND (REGEXP_LIKE(SEG_OWNER || \'.\' || TABLE_NAME,\'^FLINKUSER.ORACLECDC$\',\'i\')) )), OriginalSql = SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# FROM V$LOGMNR_CONTENTS WHERE SCN > ? AND SCN <= ? AND (SEG_OWNER IS NULL OR SEG_OWNER NOT IN (\'APPQOSSYS\',\'AUDSYS\',\'CTXSYS\',\'DVSYS\',\'DBSFWUSER\',\'DBSNMP\',\'GSMADMIN_INTERNAL\',\'LBACSYS\',\'MDSYS\',\'OJVMSYS\',\'OLAPSYS\',\'ORDDATA\',\'ORDSYS\',\'OUTLN\',\'SYS\',\'SYSTEM\',\'WMSYS\',\'XDB\')) AND ((OPERATION_CODE IN (6,7,34,36) OR (OPERATION_CODE = 5 AND USERNAME NOT IN (\'SYS\',\'SYSTEM\') AND INFO NOT LIKE \'INTERNAL DDL%\' AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE \'ORA_TEMP_%\')) ) OR (OPERATION_CODE IN (1,2,3,255) AND TABLE_NAME != \'LOG_MINING_FLUSH\' AND (REGEXP_LIKE(SEG_OWNER,\'^FLINKUSER$\',\'i\')) AND (REGEXP_LIKE(SEG_OWNER || \'.\' || TABLE_NAME,\'^FLINKUSER.ORACLECDC$\',\'i\')) )), Error Msg = ORA-01031: insufficient privileges授权:GRANT SELECT_CATALOG_ROLE TO fcdcuser;GRANT EXECUTE_CATALOG_ROLE TO fcdcuser;错误:[ERROR] Could not execute SQL statement. Reason:Error : 1031, Position : 160, Sql = SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# FROM V$LOGMNR_CONTENTS WHERE SCN > :1 AND SCN <= :2 AND (SEG_OWNER IS NULL OR SEG_OWNER NOT IN (\'APPQOSSYS\',\'AUDSYS\',\'CTXSYS\',\'DVSYS\',\'DBSFWUSER\',\'DBSNMP\',\'GSMADMIN_INTERNAL\',\'LBACSYS\',\'MDSYS\',\'OJVMSYS\',\'OLAPSYS\',\'ORDDATA\',\'ORDSYS\',\'OUTLN\',\'SYS\',\'SYSTEM\',\'WMSYS\',\'XDB\')) AND ((OPERATION_CODE IN (6,7,34,36) OR (OPERATION_CODE = 5 AND USERNAME NOT IN (\'SYS\',\'SYSTEM\') AND INFO NOT LIKE \'INTERNAL DDL%\' AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE \'ORA_TEMP_%\')) ) OR (OPERATION_CODE IN (1,2,3,255) AND TABLE_NAME != \'LOG_MINING_FLUSH\' AND (REGEXP_LIKE(SEG_OWNER,\'^FLINKUSER$\',\'i\')) AND (REGEXP_LIKE(SEG_OWNER || \'.\' || TABLE_NAME,\'^FLINKUSER.ORACLECDC$\',\'i\')) )), OriginalSql = SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# FROM V$LOGMNR_CONTENTS WHERE SCN > ? AND SCN <= ? AND (SEG_OWNER IS NULL OR SEG_OWNER NOT IN (\'APPQOSSYS\',\'AUDSYS\',\'CTXSYS\',\'DVSYS\',\'DBSFWUSER\',\'DBSNMP\',\'GSMADMIN_INTERNAL\',\'LBACSYS\',\'MDSYS\',\'OJVMSYS\',\'OLAPSYS\',\'ORDDATA\',\'ORDSYS\',\'OUTLN\',\'SYS\',\'SYSTEM\',\'WMSYS\',\'XDB\')) AND ((OPERATION_CODE IN (6,7,34,36) OR (OPERATION_CODE = 5 AND USERNAME NOT IN (\'SYS\',\'SYSTEM\') AND INFO NOT LIKE \'INTERNAL DDL%\' AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE \'ORA_TEMP_%\')) ) OR (OPERATION_CODE IN (1,2,3,255) AND TABLE_NAME != \'LOG_MINING_FLUSH\' AND (REGEXP_LIKE(SEG_OWNER,\'^FLINKUSER$\',\'i\')) AND (REGEXP_LIKE(SEG_OWNER || \'.\' || TABLE_NAME,\'^FLINKUSER.ORACLECDC$\',\'i\')) )), Error Msg = ORA-01031: insufficient privileges授权:GRANT SELECT ANY TRANSACTION TO fcdcuser;
通过以上过程,成功访问Oracle表
5.测试结果
oracle flink连接用户仅授予以下权限:ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;GRANT CREATE SESSION TO fcdcuser;GRANT SELECT ON V_$DATABASE to fcdcuser;GRANT SELECT ANY TABLE TO fcdcuser;GRANT SELECT_CATALOG_ROLE TO fcdcuser;GRANT EXECUTE_CATALOG_ROLE TO fcdcuser;GRANT SELECT ANY TRANSACTION TO fcdcuser;GRANT ANALYZE ANY TO fcdcuser;GRANT CREATE TABLE TO fcdcuser;GRANT CREATE SEQUENCE TO fcdcuser;GRANT EXECUTE ON DBMS_LOGMNR TO fcdcuser;GRANT EXECUTE ON DBMS_LOGMNR_D TO fcdcuser;GRANT SELECT ON V_$LOG TO fcdcuser;GRANT SELECT ON V_$LOG_HISTORY TO fcdcuser;GRANT SELECT ON V_$LOGMNR_LOGS TO fcdcuser;GRANT SELECT ON V_$LOGMNR_CONTENTS TO fcdcuser;GRANT SELECT ON V_$LOGMNR_PARAMETERS TO fcdcuser;GRANT SELECT ON V_$LOGFILE TO fcdcuser;GRANT SELECT ON V_$ARCHIVED_LOG TO fcdcuser;GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO fcdcuser;
实际测试过程
2025-03-31
Caused by: Error : 1031, Position : 18, Sql = LOCK TABLE \"CCXE\".\"PUB_ORG_INFO\" IN ROW SHARE MODE, OriginalSql = LOCK TABLE \"CCXE\".\"PUB_ORG_INFO\" IN ROW SHARE MODE, Error Msg = ORA-01031: 权限不足
GRANT LOCK ANY TABLE TO flinkuser;
1、归档(补充)日志的作用:归档日志记录了数据库的所有事务更改,是数据库恢复和数据同步的重要依据。在数据同步过程中,同步工具(如Flink CDC)通过解析归档日志来获取自上次同步以来的增量数据,从而实现数据的实时同步。
2、清理归档日志后的影响:如果归档日志只保存7天,而Flink CDC运行脚本暂停了10天再启动,那么在这10天内产生的归档日志中,有3天的归档日志已经被清理掉。由于这些日志记录了这3天内的数据变更信息,而同步工具无法获取这部分缺失的日志内容,因此无法通过常规的增量同步方式追加同步这3天内的数据变更。
3、从以下日志中可以总结出,Logminer会依次解析归档日志文件和当前重做日志文件,所以如果Flink CDC运行脚本暂停了10天再启动,会重新去读取一遍归档日志,然后切换到redo日志来进行读取。
查询数据库级别的补充日志状态
SELECT supplemental_log_data_min,
supplemental_log_data_pk,
supplemental_log_data_ui,
supplemental_log_data_fk,
supplemental_log_data_all
FROM v$database;
查询表级别的补充日志状态
select * from dba_log_groups
删除主键附加日志
SQL> ALTER TABLE FLINKUSER.PUB_ORG_INFO_BAO DROP SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
Table altered.
添加主键附加日志
SQL> ALTER TABLE FLINKUSER.PUB_ORG_INFO_BAO ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
Table altered.
补充日志组
ALTER TABLE your_table ADD SUPPLEMENTAL LOG GROUP log_group_unique_cols (unique_column1, unique_column2) ALWAYS;
SQL> ALTER TABLE FLINKUSER.PUB_ORG_INFO_BAO DROP SUPPLEMENTAL LOG GROUP log_group_unique_cols;