Debezium同步之DB2数据到Kafka的同步
目录
一、前言
二、概述
2.1 连接器的工作原理
2.2 快照
三、设置 Db2
3.1 将表格置于捕获模式
3.2 Db2 捕获代理配置对服务器负载和延迟的影响
3.3 Db2 捕获代理配置参数
四、部署
4.1 Db2 连接器配置示例
4.2 添加连接器配置
一、前言
Debezium 的 Db2 连接器可以捕获 Db2 数据库表中的行级更改。有关与此连接器兼容的 Db2 数据库版本的信息,请参阅Debezium 版本概述。
此连接器受到 SQL Server 的 Debezium 实现的强烈启发,该实现使用基于 SQL 的轮询模型,将表置于“捕获模式”。当表处于捕获模式时,Debezium Db2 连接器为对该表的每个行级更新生成并流式传输更改事件。
处于捕获方式的表具有关联的更改数据表,由 Db2 创建。对于处于捕获模式的表的每次更改,Db2 都会将有关该更改的数据添加到表的关联更改数据表中。更改数据表包含一行的每个状态的条目。它还有用于删除的特殊条目。Debezium Db2 连接器从更改数据表中读取更改事件并将事件发送到 Kafka 主题。
Debezium Db2 连接器第一次连接到 Db2 数据库时,连接器会读取表的一致快照,连接器被配置为捕获更改。默认情况下,这是所有非系统表。有一些连接器配置属性可让您指定将哪些表置于捕获模式,或从捕获模式中排除哪些表。
快照完成后,连接器开始向处于捕获模式的表发送已提交更新的更改事件。默认情况下,特定表的更改事件会转到与表同名的 Kafka 主题。应用程序和服务使用来自这些主题的更改事件。
连接器需要使用抽象语法表示法 (ASN) 库,这些库作为 Db2 for Linux 的标准部分提供。要使用 ASN 库,您必须拥有 IBM InfoSphere Data Replication (IIDR) 的许可证。您不必安装 IIDR 即可使用 ASN 库。 |
二、概述
Debezium Db2 连接器基于在 Db2 中启用 SQL 复制的ASN Capture/Apply 代理。捕获代理:
-
为处于捕获模式的表生成更改数据表。
-
以捕获模式监视表并将更改事件存储在相应的更改数据表中以更新这些表。
Debezium 连接器使用 SQL 接口来查询更改数据表中的更改事件。
数据库管理员必须将要捕获更改的表置于捕获模式。为了方便和自动化测试,C 中有Debezium 用户定义函数 (UDF),您可以对其进行编译,然后使用它来执行以下管理任务:
-
启动、停止和重新初始化 ASN 代理
-
将表格置于捕获模式
-
创建复制 (ASN) 模式和更改数据表
-
从捕获模式中删除表
或者,您可以使用 Db2 控制命令来完成这些任务。
在感兴趣的表处于捕获模式后,连接器会读取其相应的更改数据表以获取表更新的更改事件。连接器针对每个行级插入、更新和删除操作向与更改的表同名的 Kafka 主题发出更改事件。这是您可以修改的默认行为。客户端应用程序读取与感兴趣的数据库表相对应的 Kafka 主题,并且可以对每个行级更改事件做出反应。
通常,数据库管理员在表的生命周期中将表置于捕获模式。这意味着连接器没有对表所做的所有更改的完整历史记录。因此,当 Db2 连接器首次连接到特定 Db2 数据库时,它首先对处于捕获模式的每个表执行一致的快照。连接器完成快照后,连接器会从创建快照的点开始流式传输更改事件。通过这种方式,连接器从处于捕获模式的表的一致视图开始,并且不会删除在执行快照时所做的任何更改。
Debezium 连接器可以容忍故障。当连接器读取并生成更改事件时,它会记录更改数据表条目的日志序列号 (LSN)。LSN 是更改事件在数据库日志中的位置。如果连接器因任何原因停止,包括通信故障、网络问题或崩溃,则在重新启动时它会继续读取它停止的更改数据表。这包括快照。也就是说,如果在连接器停止时快照未完成,则在重新启动连接器时会开始一个新的快照。
2.1 连接器的工作原理
为了优化配置和运行 Debezium Db2 连接器,了解连接器如何执行快照、流式传输更改事件、确定 Kafka 主题名称以及处理架构更改是很有帮助的。
2.2 快照
Db2 的复制功能并非旨在存储数据库更改的完整历史记录。因此,当 Debezium Db2 连接器首次连接到数据库时,它会对处于捕获模式的表进行一致的快照,并将此状态流式传输到 Kafka。这为表格内容建立了基线。
默认情况下,当 Db2 连接器执行快照时,它会执行以下操作:
-
确定哪些表处于捕获模式,因此必须包含在快照中。默认情况下,所有非系统表都处于捕获模式。连接器配置属性,例如
table.exclude.list
和table.include.list
让您指定哪些表应处于捕获模式。 -
在捕获模式下获取每个表的锁。这可确保在快照期间这些表中不会发生架构更改。锁定级别由
snapshot.isolation.mode
连接器配置属性确定。 -
读取服务器事务日志中最高(最新)的 LSN 位置。
-
捕获处于捕获模式的所有表的架构。连接器将此信息保存在其内部数据库历史主题中。
-
可选,释放在步骤 2 中获得的锁。通常,这些锁只保留很短的时间。
-
在步骤 3 中读取的 LSN 位置,连接器扫描捕获模式表及其架构。在扫描期间,连接器:
-
确认表是在快照开始之前创建的。如果不是,则快照会跳过该表。快照完成后,连接器开始发出更改事件,连接器为在快照期间创建的任何表生成更改事件。
-
为处于捕获模式的每个表中的每一行生成一个读取事件。所有读取事件都包含相同的 LSN 位置,也就是在步骤 3 中获得的 LSN 位置。
-
将每个读取事件发送到与表同名的 Kafka 主题。
-
-
在连接器偏移中记录快照的成功完成。
三、设置 Db2
为了让 Debezium 捕获提交到 Db2 表的更改事件,具有必要权限的 Db2 数据库管理员必须在数据库中配置表以捕获更改数据。开始运行 Debezium 后,您可以调整捕获代理的配置以优化性能。
3.1 将表格置于捕获模式
为了将表置于捕获模式,Debezium 提供了一组用户定义函数 (UDF) 以方便您使用。此处的过程显示了如何安装和运行这些管理 UDF。或者,您可以运行 Db2 控制命令将表置于捕获模式。然后管理员必须为您希望 Debezium 捕获的每个表启用 CDC。
先决条件
-
您以用户身份登录到 Db2
db2instl
。 -
在 Db2 主机上,Debezium 管理 UDF 位于 $HOME/asncdctools/src 目录中。UDF 可从Debezium 示例存储库中获得。
程序
-
bldrtn
使用Db2 提供的命令在 Db2 服务器主机上编译 Debezium 管理 UDF :cd $HOME/asncdctools/src
./bldrtn asncdc
-
如果数据库尚未运行,请启动它。替换
DB_NAME
为您希望 Debezium 连接到的数据库的名称。db2 start db DB_NAME
-
确保 JDBC 可以读取 Db2 元数据目录:
cd $HOME/sqllib/bnd
db2 bind db2schema.bnd blocking all grant public sqlerror continue
-
确保最近备份了数据库。ASN 代理必须具有最近的读取起点。如果您需要执行备份,请运行以下命令,这会修剪数据,以便只有最新版本可用。如果您不需要保留旧版本的数据,请指定
dev/null
备份位置。-
备份数据库。用适当的值替换
DB_NAME
和:BACK_UP_LOCATION
db2 backup db DB_NAME to BACK_UP_LOCATION
-
重启数据库:
db2 restart db DB_NAME
-
-
连接到数据库以安装 Debezium 管理 UDF。假设您以用户身份登录,
db2instl
因此应在该db2inst1
用户上安装 UDF。db2 connect to DB_NAME
-
复制 Debezium 管理 UDF 并为其设置权限:
cp $HOME/asncdctools/src/asncdc $HOME/sqllib/function
chmod 777 $HOME/sqllib/function
-
启用启动和停止 ASN 捕获代理的 Debezium UDF:
db2 -tvmf $HOME/asncdctools/src/asncdc_UDF.sql
-
创建 ASN 控制表:
db2 -tvmf $HOME/asncdctools/src/asncdctables.sql
-
启用将表添加到捕获模式并从捕获模式中删除表的 Debezium UDF:
db2 -tvmf $HOME/asncdctools/src/asncdcaddremove.sql
设置 Db2 服务器后,使用 UDF 通过 SQL 命令控制 Db2 复制 (ASN)。某些 UDF 需要返回值,在这种情况下,您可以使用 SQL
VALUE
语句来调用它们。对于其他 UDF,请使用 SQLCALL
语句。 -
启动 ASN 代理:
VALUES ASNCDC.ASNCDCSERVICES('start','asncdc');
前面的语句返回以下结果之一:
-
asncap is already running
-
start -->
在这种情况下,请
在终端窗口中输入指定的内容,如下例所示:
/database/config/db2inst1/sqllib/bin/asncap capture_schema=asncdc capture_server=SAMPLE &
-
-
将表格置于捕获模式。为要捕获的每个表调用以下语句。替换
MYSCHEMA
为包含要进入捕获模式的表的模式的名称。同样,替换MYTABLE
为要进入捕获模式的表的名称:CALL ASNCDC.ADDTABLE('MYSCHEMA', 'MYTABLE');
-
重新初始化 ASN 服务:
VALUES ASNCDC.ASNCDCSERVICES('reinit','asncdc');
额外资源
Debezium Db2 管理 UDF 的参考表
3.2 Db2 捕获代理配置对服务器负载和延迟的影响
当数据库管理员为源表启用变更数据捕获时,捕获代理开始运行。代理从事务日志中读取新的更改事件记录,并将事件记录复制到捕获表中。在源表中提交更改与更改出现在相应更改表中的时间之间,始终存在一个小的延迟间隔。此延迟时间间隔表示源表中发生更改与 Debezium 可用于流式传输到 Apache Kafka 之间的差距。
理想情况下,对于必须快速响应数据更改的应用程序,您希望在源表和捕获表之间保持密切同步。您可能会想象,运行捕获代理以尽可能快地连续处理变更事件可能会提高吞吐量并减少延迟——在事件发生后尽快用新的事件记录填充变更表,几乎是实时的。但是,情况不一定如此。追求更直接的同步需要付出性能代价。每次更改代理向数据库查询新的事件记录时,都会增加数据库主机上的 CPU 负载。服务器上的额外负载会对整体数据库性能产生负面影响,并可能降低事务效率,
监视数据库指标很重要,这样您就可以知道数据库是否达到服务器不再支持捕获代理的活动级别的程度。如果您在运行捕获代理时遇到性能问题,请调整捕获代理设置以减少 CPU 负载。
3.3 Db2 捕获代理配置参数
在 Db2 上,该IBMSNAP_CAPPARMS
表包含控制捕获代理行为的参数。您可以调整这些参数的值以平衡捕获进程的配置,从而减少 CPU 负载并仍保持可接受的延迟水平。
有关如何配置 Db2 捕获代理参数的具体指导超出了本文档的范围。 |
在IBMSNAP_CAPPARMS
表中,以下参数对降低 CPU 负载的影响最大:
COMMIT_INTERVAL
-
指定捕获代理等待将数据提交到更改数据表的秒数。
-
较高的值会减少数据库主机上的负载并增加延迟。
-
默认值为
30
。
SLEEP_INTERVAL
-
指定捕获代理在到达活动事务日志末尾后等待开始新提交周期的秒数。
-
较高的值会减少服务器上的负载,并增加延迟。
-
默认值为
5
。
其他资源
-
有关捕获代理参数的更多信息,请参阅 Db2 文档。
四、部署
要部署 Debezium Db2 连接器,请安装 Debezium Db2 连接器存档,配置连接器,然后通过将其配置添加到 Kafka Connect 来启动连接器。
先决条件
-
Apache ZooKeeper、Apache Kafka和Kafka Connect已安装。
-
安装 Db2 并为表启用捕获模式以准备要与 Debezium 连接器一起使用的数据库。
程序
-
从 Maven Central下载Debezium Db2 连接器插件存档。
-
将 JAR 文件提取到您的 Kafka Connect 环境中。
-
从 Maven Central下载 Db2 的JDBC 驱动程序,并将下载的驱动程序文件解压缩到包含 Debezium Db2 连接器 JAR 文件的目录(即 .
debezium-connector-db2-1.9.0.Final.jar
)。由于许可要求,Debezium Db2 连接器归档不包含 Debezium 连接到 Db2 数据库所需的 Db2 JDBC 驱动程序。要使连接器能够访问数据库,您必须将驱动程序添加到连接器环境中。
-
将包含 JAR 文件的目录添加到Kafka Connect 的plugin.path.
-
重新启动 Kafka Connect 进程以获取新的 JAR 文件。
如果您正在使用不可变容器,请参阅Debezium 的Apache ZooKeeper、Apache Kafka 和 Kafka Connect 容器映像,其中 Db2 连接器已经安装并准备好运行。
您还可以在 Kubernetes 和 OpenShift 上运行 Debezium。
下一步
-
配置连接器并将配置添加到您的 Kafka Connect 集群。
4.1 Db2 连接器配置示例
以下是连接器实例的配置示例,该实例从位于 192.168.99.100 的端口 50000 上的 Db2 服务器捕获数据,我们在逻辑上将其命名为fullfillment
. 通常,您通过设置可用于连接器的配置属性,在 JSON 文件中配置 Debezium Db2 连接器。
您可以选择为数据库中的模式和表的子集生成事件。或者,您可以忽略、屏蔽或截断包含敏感数据、大于指定大小或您不需要的列。
{ "name": "db2-connector", "config": { "connector.class": "io.debezium.connector.db2.Db2Connector", "database.hostname": "192.168.99.100", "database.port": "50000", "database.user": "db2inst1", "database.password": "Password!", "database.dbname": "mydatabase", "database.server.name": "fullfillment", "table.include.list": "MYSCHEMA.CUSTOMERS", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.fullfillment" }}
1 | 向 Kafka Connect 服务注册时的连接器名称。 |
2 | 此 Db2 连接器类的名称。 |
3 | Db2 实例的地址。 |
4 | Db2 实例的端口号。 |
5 | Db2 用户的名称。 |
6 | Db2 用户的密码。 |
7 | 要从中捕获更改的数据库的名称。 |
8 | Db2 实例/集群的逻辑名称,形成一个命名空间,用于连接器写入的所有 Kafka 主题的名称、Kafka Connect 模式名称以及Avro 连接器运行时对应 Avro 模式的命名空间用过的。 |
9 | Debezium 应捕获其更改的所有表的列表。 |
10 | 此连接器用于将 DDL 语句写入和恢复到数据库历史主题的 Kafka 代理列表。 |
11 | 连接器写入和恢复 DDL 语句的数据库历史主题的名称。本主题仅供内部使用,消费者不得使用。 |
有关可以为 Debezium Db2 连接器设置的配置属性的完整列表,请参阅Db2 连接器属性。
您可以使用命令将此配置发送POST
到正在运行的 Kafka Connect 服务。该服务记录配置并启动一个执行以下操作的连接器任务:
-
连接到 Db2 数据库。
-
读取处于捕获模式的表的更改数据表。
-
流将事件记录更改为 Kafka 主题。
4.2 添加连接器配置
要开始运行 Db2 连接器,请创建一个连接器配置并将该配置添加到您的 Kafka Connect 集群。
先决条件
-
启用 Db2 复制以公开处于捕获模式的表的更改数据。
-
已安装 Db2 连接器。
程序
-
为 Db2 连接器创建配置。
-
使用Kafka Connect REST API将该连接器配置添加到您的 Kafka Connect 集群。
结果
连接器启动后,它会对连接器配置为捕获更改的 Db2 数据库表执行一致的快照。然后,连接器开始为行级操作生成数据更改事件,并将更改事件记录流式传输到 Kafka 主题。