> 文档中心 > Debezium同步之MongoDB数据到Kafka的同步

Debezium同步之MongoDB数据到Kafka的同步

目录

   一、前言

   二、概述

   三、设置 MongoDB

3.1 云上的 MongoDB

   四、部署

4.1 MongoDB 连接器配置示例

4.2 添加连接器配置


一、前言

Debezium 的 MongoDB 连接器跟踪 MongoDB 副本集或 MongoDB 分片集群以查找数据库和集合中的文档更改,并将这些更改记录为 Kafka 主题中的事件。连接器自动处理分片集群中分片的添加或删除、每个副本集成员资格的更改、每个副本集中的选举以及等待通信问题的解决。

有关与此连接器兼容的 MongoDB 版本的信息,请参阅Debezium 版本概述。

   二、概述

MongoDB 的复制机制提供了冗余和高可用性,是在生产环境中运行 MongoDB 的首选方式。MongoDB 连接器捕获副本集或分片集群中的更改。

MongoDB副本集由一组服务器组成,这些服务器都具有相同数据的副本,并且复制可确保客户端对副本集节点上的文档所做的所有更改都正确应用于另一个副本集的服务器,称为辅助节点。MongoDB 复制的工作原理是让主节点在其oplog(或操作日志)中记录更改,然后每个辅助节点读取主节点的 oplog 并按顺序将所有操作应用于自己的文档。将新服务器添加到副本集时,该服务器首先执行快照主节点上的所有数据库和集合,然后读取主节点的 oplog 以应用自快照开始以来可能已进行的所有更改。当这个新服务器赶上主服务器 oplog 的尾部时,它成为辅助服务器(并且能够处理查询)。

MongoDB 连接器支持两种不同的模式来捕获由capture.mode选项控制的更改:

  • 基于oplog

  • 基于更改流

   三、设置 MongoDB

MongoDB 连接器使用 MongoDB 的 oplog/change 流来捕获更改,因此连接器仅适用于 MongoDB 副本集或分片集群,其中每个分片都是一个单独的副本集。有关设置副本集或分片集群的信息,请参阅 MongoDB 文档。此外,请务必了解如何使用副本集启用访问控制和身份验证。

您还必须有一个具有适当角色的 MongoDB 用户来读取admin可以读取 oplog 的数据库。此外,用户还必须能够读取分片config集群的配置服务器中的数据库,并且必须具有listDatabases特权操作。当使用更改流(默认)时,用户还必须具有集群范围的权限操作findchangeStream.

3.1 云上的 MongoDB

您可以将 MongoDB 的 Debezium 连接器与MongoDB Atlas一起使用。将 Debezium 连接到 MongoDB Atlas 时,启用其中之一capture modes基于更改流,而不是 oplog。请注意,MongoDB Atlas 仅支持通过 SSL 的安全连接,即+mongodb.ssl.enabled连接器选项必须设置为true.

   四、部署

要部署 Debezium MongoDB 连接器,请安装 Debezium MongoDB 连接器存档,配置连接器,然后通过将其配置添加到 Kafka Connect 来启动连接器。

先决条件

  • 已安装Apache Zookeeper、Apache Kafka和Kafka Connect。

  • MongoDB 已安装并设置为与 Debezium 连接器一起使用。

程序

  1. 下载 连接器的插件存档,

  2. 将 JAR 文件提取到您的 Kafka Connect 环境中。

  3. 将包含 JAR 文件的目录添加到Kafka Connect 的plugin.path.

  4. 重新启动 Kafka Connect 进程以获取新的 JAR 文件。

如果您正在使用不可变容器,请参阅Debezium 的Apache Zookeeper、Apache Kafka 和 Kafka Connect 容器映像,其中 MongoDB 连接器已安装并准备好运行。

您还可以在 Kubernetes 和 OpenShift 上运行 Debezium。

Debezium教程将引导您使用这些图像,这是了解 Debezium 的好方法。

4.1 MongoDB 连接器配置示例

以下是连接器实例的配置示例,该实例从位于rs0192.168.99.100 的端口 27017 的 MongoDB 副本集中捕获数据,我们在逻辑上将其命名为fullfillment. 通常,您通过设置可用于连接器的配置属性,在 JSON 文件中配置 Debezium MongoDB 连接器。

您可以选择为特定的 MongoDB 副本集或分片集群生成事件。或者,您可以过滤掉不需要的集合。

{  "name": "inventory-connector",   "config": {    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",     "mongodb.hosts": "rs0/192.168.99.100:27017",     "mongodb.name": "fullfillment",     "collection.include.list": "inventory[.]*"   }}

1 当我们向 Kafka Connect 服务注册连接器时的名称。
2 MongoDB 连接器类的名称。
3 用于连接到 MongoDB 副本集的主机地址。
4 MongoDB 副本集的逻辑名称,形成生成事件的命名空间,并用于连接器写入的所有 Kafka 主题的名称、Kafka Connect 模式名称以及 Avro 时相应 Avro 模式的命名空间使用转换器。
5 与要监视的所有集合的集合命名空间(例如,.)匹配的正则表达式列表。这是可选的。

有关可以为 Debezium MongoDB 连接器设置的配置属性的完整列表,请参阅MongoDB 连接器配置属性。

您可以使用命令将此配置发送POST到正在运行的 Kafka Connect 服务。该服务记录配置并启动一个执行以下操作的连接器任务:

  • 连接到 MongoDB 副本集或分片集群。

  • 为每个副本集分配任务。

  • 如有必要,执行快照。

  • 读取 oplog/change 流。

  • 流将事件记录更改为 Kafka 主题。

4.2 添加连接器配置

要开始运行 Debezium MongoDB 连接器,请创建一个连接器配置,并将该配置添加到您的 Kafka Connect 集群。

先决条件

  • MongoDB 设置为使用 Debezium 连接器。

  • Debezium MongoDB 连接器已安装。

程序

  1. 为 MongoDB 连接器创建配置。

  2. 使用Kafka Connect REST API将该连接器配置添加到您的 Kafka Connect 集群。

结果

连接器启动后,它会完成以下操作:

  • 对 MongoDB 副本集中的集合执行一致的快照。

  • 读取副本集的 oplogs/change 流。

  • 为每个插入、更新和删除的文档生成更改事件。

  • 流将事件记录更改为 Kafka 主题。