Debezium同步之MySQL数据到Kafka的同步
目录
一、概述
1.1 连接器的工作原理
二、设置 MySQL
2.1 创建用户
2.2 启用二进制日志
2.3 启用 GTID
2.4 配置会话超时
2.5 启用查询日志事件
三、部署
3.1 MySQL 连接器配置示例
3.2 添加连接器配置
一、概述
MySQL 有一个二进制日志(binlog),它按照提交到数据库的顺序记录所有操作。这包括对表模式的更改以及对表中数据的更改。MySQL 使用 binlog 进行复制和恢复。
Debezium MySQL 连接器读取 binlog,为行级INSERT
、、UPDATE
和DELETE
操作生成更改事件,并将更改事件发送到 Kafka 主题。客户端应用程序读取这些 Kafka 主题。
由于 MySQL 通常设置为在指定时间段后清除 binlog,因此 MySQL 连接器会对您的每个数据库执行初始一致快照。MySQL 连接器从创建快照的位置读取 binlog。
有关与此连接器兼容的 MySQL 数据库版本的信息,请参阅Debezium 版本概述。
1.1 连接器的工作原理
连接器支持的 MySQL 拓扑的概述对于规划您的应用程序很有用。为了优化配置和运行 Debezium MySQL 连接器,了解连接器如何跟踪表结构、公开模式更改、执行快照以及确定 Kafka 主题名称会很有帮助。
Debezium MySQL 连接器尚未在 MariaDB 上进行测试,但来自社区的多份报告表明该连接器已成功用于该数据库。计划在未来的 Debezium 版本中提供对 MariaDB 的官方支持。 |
二、设置 MySQL
在安装和运行 Debezium 连接器之前,需要执行一些 MySQL 设置任务。
2.1 创建用户
Debezium MySQL 连接器需要 MySQL 用户帐户。此 MySQL 用户必须对 Debezium MySQL 连接器捕获更改的所有数据库具有适当的权限。
先决条件
-
一个 MySQL 服务器。
-
SQL 命令的基本知识。
程序
-
创建 MySQL 用户:
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
-
授予用户所需的权限:
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
下表描述了权限。
如果使用不允许全局读取锁定的托管选项(例如 Amazon RDS 或 Amazon Aurora),则使用表级锁定来创建一致快照。在这种情况下,您还需要向 LOCK TABLES
您创建的用户授予权限。有关更多详细信息,请参阅快照。 -
最终确定用户的权限:
mysql> FLUSH PRIVILEGES;
1. 用户权限的描述
2.2 启用二进制日志
您必须为 MySQL 复制启用二进制日志记录。二进制日志记录复制工具的事务更新以传播更改。
先决条件
-
一个 MySQL 服务器。
-
适当的 MySQL 用户权限。
程序
-
检查该
log-bin
选项是否已打开:// for MySql 5.xmysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"FROM information_schema.global_variables WHERE variable_name='log_bin';// for MySql 8.xmysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"FROM performance_schema.global_variables WHERE variable_name='log_bin';
-
如果是
OFF
,请使用以下属性配置您的 MySQL 服务器配置文件,如下表所述:server-id = 223344log_bin = mysql-binbinlog_format = ROWbinlog_row_image = FULLexpire_logs_days = 10
-
通过再次检查 binlog 状态来确认您的更改:
// for MySql 5.xmysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"FROM information_schema.global_variables WHERE variable_name='log_bin';// for MySql 8.xmysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"FROM performance_schema.global_variables WHERE variable_name='log_bin';
2. MySQL binlog 配置属性的描述
2.3 启用 GTID
全局事务标识符 (GTID) 唯一标识集群内服务器上发生的事务。尽管 Debezium MySQL 连接器不需要,但使用 GTID 可以简化复制并使您能够更轻松地确认主服务器和副本服务器是否一致。
GTID 在 MySQL 5.6.5 及更高版本中可用。有关更多详细信息,请参阅MySQL 文档。
先决条件
-
一个 MySQL 服务器。
-
SQL 命令的基本知识。
-
访问 MySQL 配置文件。
程序
-
启用
gtid_mode
:mysql> gtid_mode=ON
-
启用
enforce_gtid_consistency
:mysql> enforce_gtid_consistency=ON
-
确认更改:
mysql> show global variables like '%GTID%';
结果
+--------------------------+-------+| Variable_name | Value |+--------------------------+-------+| enforce_gtid_consistency | ON || gtid_mode | ON |+--------------------------+-------+
3.GTID 选项的描述
2.4 配置会话超时
当为大型数据库制作初始一致快照时,您建立的连接可能会在读取表时超时。您可以通过在 MySQL 配置文件中配置interactive_timeout
和来防止这种行为。wait_timeout
先决条件
-
一个 MySQL 服务器。
-
SQL 命令的基本知识。
-
访问 MySQL 配置文件。
程序
-
配置
interactive_timeout
:mysql> interactive_timeout=
-
配置
wait_timeout
:mysql> wait_timeout=
4.MySQL 会话超时选项的描述
MySQL 的文档-interactive_timeout描述
MySQL的文档-wait_timeout描述
2.5 启用查询日志事件
您可能希望查看SQL
每个 binlog 事件的原始语句。在 MySQL 配置文件中启用该binlog_rows_query_log_events
选项允许您执行此操作。
此选项在 MySQL 5.6 及更高版本中可用。
先决条件
-
一个 MySQL 服务器。
-
SQL 命令的基本知识。
-
访问 MySQL 配置文件。
程序
-
启用
binlog_rows_query_log_events
:mysql> binlog_rows_query_log_events=ON
binlog_rows_query_log_events
设置为启用/禁用对SQL
在 binlog 条目中包含原始语句的支持的值。-
ON
= 启用 -
OFF
= 禁用
-
三、部署
要部署 Debezium MySQL 连接器,您需要安装 Debezium MySQL 连接器存档,配置连接器,然后通过将其配置添加到 Kafka Connect 来启动连接器。
先决条件
-
已安装Apache Zookeeper、Apache Kafka和Kafka Connect。
-
MySQL 服务器已安装并设置为与 Debezium 连接器一起使用。
程序
-
下载 Debezium MySQL 连接器插件。
-
将文件提取到您的 Kafka Connect 环境中。
-
将包含 JAR 文件的目录添加到Kafka Connect 的plugin.path.
-
配置连接器并将配置添加到您的 Kafka Connect 集群。
-
重新启动 Kafka Connect 进程以获取新的 JAR 文件。
如果您正在使用不可变容器,请参阅Debezium 的Apache Zookeeper、Apache Kafka、MySQL 和 Kafka Connect 容器映像,其中 MySQL 连接器已安装并准备运行。
您还可以在 Kubernetes 和 OpenShift 上运行 Debezium。
3.1 MySQL 连接器配置示例
以下是连接器实例的配置示例,该实例从位于 192.168.99.100 的端口 3306 上的 MySQL 服务器捕获数据,我们在逻辑上将其命名为fullfillment
. 通常,您通过设置可用于连接器的配置属性,在 JSON 文件中配置 Debezium MySQL 连接器。
您可以选择为数据库中的模式和表的子集生成事件。或者,您可以忽略、屏蔽或截断包含敏感数据、大于指定大小或您不需要的列。
{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "192.168.99.100", "database.port": "3306", "database.user": "debezium-user", "database.password": "debezium-user-pw", "database.server.id": "184054", "database.server.name": "fullfillment", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.fullfillment", "include.schema.changes": "true" }}
1 | 向 Kafka Connect 服务注册时的连接器名称。 |
2 | 连接器的类名。 |
3 | MySQL 服务器地址。 |
4 | MySQL 服务器端口号。 |
5 | 具有适当权限的 MySQL 用户。 |
6 | MySQL 用户的密码。 |
7 | 连接器的唯一 ID。 |
8 | MySQL 服务器或集群的逻辑名称。 |
9 | 指定服务器托管的数据库列表。 |
10 | 连接器用于将 DDL 语句写入和恢复到数据库历史主题的 Kafka 代理列表。 |
11 | 数据库历史主题的名称。本主题仅供内部使用,消费者不得使用。 |
12 | 指定连接器是否应为 DDL 更改生成事件并将它们发送到fulfillment 架构更改主题以供使用者使用的标志。 |
有关可以为 Debezium MySQL 连接器设置的配置属性的完整列表,请参阅MySQL 连接器配置属性。
您可以使用命令将此配置发送POST
到正在运行的 Kafka Connect 服务。该服务记录配置并启动一个执行以下操作的连接器任务:
-
连接到 MySQL 数据库。
-
在捕获模式下读取表的更改数据表。
-
流将事件记录更改为 Kafka 主题。
3.2 添加连接器配置
要开始运行 MySQL 连接器,请配置连接器配置,并将配置添加到您的 Kafka Connect 集群。
先决条件
-
MySQL 设置为使用 Debezium 连接器。
-
Debezium MySQL 连接器已安装。
程序
-
为 MySQL 连接器创建配置。
-
使用Kafka Connect REST API将该连接器配置添加到您的 Kafka Connect 集群。
结果
连接器启动后,它会为连接器配置的 MySQL 数据库执行一致的快照。然后,连接器开始为行级操作生成数据更改事件,并将更改事件记录流式传输到 Kafka 主题。