Debezium同步之Cassandra数据到Kafka的同步
目录
一、前言
二、概述
三、设置 Cassandra
3.1 在节点上启用 CDC
3.2 在表上启用 CDC
四、部署连接器
4.1 示例配置
4.2 监控
一、前言
Cassanadra 连接器可以监控 Cassandra 集群并记录所有行级更改。连接器必须本地部署在 Cassandra 集群中的每个节点上。连接器第一次连接到 Cassandra 节点时,它会对所有键空间中所有启用 CDC 的表执行快照。连接器还将读取写入 Cassandra 提交日志的更改并生成相应的插入、更新和删除事件。每个表的所有事件都记录在一个单独的 kafka 主题中,应用程序和服务可以轻松地在其中使用它们。
有关与此连接器兼容的 Cassandra 版本的信息,请参阅Debezium 版本概述。
二、概述
Cassandra 是一个开源的 NoSQL 数据库。与大多数数据库类似,Cassandra 的写入路径从立即将更改记录到其提交日志开始。提交日志位于每个节点的本地,记录对该节点的每次写入。
从 Cassandra 3.0 开始,引入了变更数据捕获 (CDC) 功能。可以通过设置 table 属性在表级别启用 CDC 功能cdc=true
,之后任何包含启用 CDC 的表的数据的提交日志将被移动到cassandra.yaml
丢弃时指定的 CDC 目录。
Cassandra 连接器驻留在每个 Cassandra 节点上并监视cdc_raw
目录的更改。它在检测到所有本地提交日志段时对其进行处理,为提交日志中的每个行级插入、更新和删除操作生成一个更改事件,在单独的 Kafka 主题中发布每个表的所有更改事件,最后删除从cdc_raw
目录提交日志。最后一步很重要,因为一旦启用 CDC,Cassandra 本身就无法清除提交日志。如果cdc_free_space_in_mb
填满,写入启用 CDC 的表将被拒绝。
连接器可以容忍故障。当连接器读取提交日志并产生事件时,它会记录每个提交日志段的文件名和位置以及每个事件。如果连接器因任何原因停止(包括通信故障、网络问题或崩溃),则在重新启动时它会继续读取上次停止的提交日志。这包括快照:如果在连接器停止时快照未完成,则在重新启动时它将开始一个新的快照。稍后我们将讨论出现问题时连接器的行为方式。
Cassandra 与其他 Debezium 连接器不同,因为它不是在 Kafka Connect 框架之上实现的。相反,它是一个单一的 JVM 进程,旨在驻留在每个 Cassandra 节点上,并通过 Kafka 生产者将事件发布到 Kafka。 |
Cassandra 连接器当前不支持以下功能。由这些功能中的任何一个引起的更改都将被忽略:
|
三、设置 Cassandra
在可以使用 Debezium Cassandra 连接器监控 Cassandra 集群中的更改之前,必须在节点级别和表级别启用 CDC。
3.1 在节点上启用 CDC
要启用 CDC,请更新以下 CDC 配置cassandra.yaml
:
cdc_enabled: true
其他 CDC 配置具有以下默认值:
cdc_raw_directory: $CASSANDRA_HOME/data/cdc_rawcdc_free_space_in_mb: 4096cdc_free_space_check_interval_ms: 250
-
cdc_enabled
在节点范围内启用或禁用 CDC 操作 -
cdc_raw_directory
在刷新所有相应的内存表后确定要移动的提交日志段的目标 -
cdc_free_space_in_mb
是分配用于存储提交日志段的最大容量,默认为最小值 4096 MB 和 1/8 卷空间。 -
cdc_free_space_check_interval_ms
是我们重新计算占用空间的频率,cdc_raw_directory
以防止在满负荷时不必要地消耗 CPU 周期。
3.2 在表上启用 CDC
在 Cassandra 节点上启用 CDC 后,还必须通过 CREATE TABLE 或 ALTER TABLE 命令为 CDC 显式启用每个表。例如:
CREATE TABLE foo (a int, b text, PRIMARY KEY(a)) WITH cdc=true;ALTER TABLE foo WITH cdc=true;
四、部署连接器
Cassandra 连接器应该部署在 Cassandra 集群中的每个 Cassandra 节点上。Cassandra 连接器 Jar 文件采用 cdc 配置 (.properties) 文件。请参阅示例配置以供参考。
4.1 示例配置
以下是用于在本地运行和测试 Cassandra 连接器的示例 .properties 配置文件:
connector.name=test_connectorcommit.log.relocation.dir=/Users/test_user/debezium-connector-cassandra/test_dir/relocation/http.port=8000cassandra.config=/usr/local/etc/cassandra/cassandra.yamlcassandra.hosts=127.0.0.1cassandra.port=9042kafka.producer.bootstrap.servers=127.0.0.1:9092kafka.producer.retries=3kafka.producer.retry.backoff.ms=1000kafka.topic.prefix=test_prefixkey.converter=io.confluent.connect.avro.AvroConverterkey.converter.schema.registry.url: http://localhost:8081value.converter=io.confluent.connect.avro.AvroConvertervalue.converter.schema.registry.url: http://localhost:8081offset.backing.store.dir=/Users/test_user/debezium-connector-cassandra/test_dir/snapshot.consistency=ONEsnapshot.mode=ALWAYSlatest.commit.log.only=true
4.2 监控
Cassandra 连接器具有对 JMX 指标的内置支持。Cassandra 驱动程序还发布了一些关于驱动程序活动的指标,这些指标可以通过 JMX 进行监控。连接器有两种类型的指标。快照指标可帮助您监控快照活动,并且在连接器执行快照时可用。Binlog 指标可帮助您在连接器读取 Cassandra 提交日志时监控进度和活动。