> 文档中心 > mysql 二进制订阅神器 alibaba 的 Canal

mysql 二进制订阅神器 alibaba 的 Canal


第 1 章 Canal 入门

1.1 什么是

mysql 二进制订阅神器 alibaba 的 Canal

Canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

阿里巴巴 B2B 公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所 以衍生出了同步杭州和美国异地机房的需求,从 2010 年开始,阿里系公司开始逐步的尝试 基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。 Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。 目前。Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得 的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

1.2MySQL 的 Binlog

1.2.1 什么是 Binlog

MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML(除 了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL 的二进制日志是事务安全型的。

一般来说开启二进制日志大概会有 1%的性能损耗。二进制有两个最重要的使用场景:

  • 其一:MySQL Replication 在 Master 端开启 Binlog,Master 把它的二进制日志传递给 Slaves 来达到 Master-Slave 数据一致的目的。

  • 其二:自然就是数据恢复了,通过使用 MySQL Binlog 工具来使恢复数据。

二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有 的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除 了数据查询语句)语句事件。

1.2.2 Binlog 的分类

MySQL Binlog 的格式有三种,分别是 STATEMENT,MIXED,ROW。在配置文件中可以选择配 置 binlog_format= statement|mixed|row。三种格式的区别:

1)statement:语句级,binlog 会记录每次一执行写操作的语句。相对 row 模式节省空间,但是可能产生不一致性,比如“update tt set create_date=now()”,如果用 binlog 日志 进行恢复,由于执行时间不同可能产生的数据就不同。

  • 优点:节省空间。
  • 缺点:有可能造成数据不一致。

2)row:行级, binlog 会记录每次操作后每行记录的变化。

  • 优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,他只记录 执行后的效果。

  • 缺点:占用较大空间。

3)mixed:statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement 模式不一致问题,默认还是 statement,在某些情况下譬如:当函数中包含 UUID() 时;包含 AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照 ROW 的方式进行处理

  • 优点:节省空间,同时兼顾了一定的一致性。

  • 缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对 binlog 的监控的情况都不方便。

    综合上面对比,Canal 想做监控分析,选择 row 格式比较合适。

1.3 工作原理

1.3.1 MySQL主备复制原理

mysql 二进制订阅神器 alibaba 的 Canal

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

1.3.2 canal 工作原理

mysql 二进制订阅神器 alibaba 的 Canal

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

1.4 使用场景

1)原始场景: 阿里 Otter 中间件的一部分

Otter 是阿里用于进行异地数据库之间的同步框架,Canal 是其中一部分。

mysql 二进制订阅神器 alibaba 的 Canal

2)常见场景 1:更新缓存

mysql 二进制订阅神器 alibaba 的 Canal

3)常见场景 2:抓取业务表的新增变化数据,用于制作实时统计

第 2 章 MySQL 的准备

2.1 创建数据表

CREATE TABLE user_info(`id` VARCHAR(255),`name` VARCHAR(255),`sex` VARCHAR(255));

2.2 修改配置文件开启 Binlog

$ sudo vim /etc/my.cnfserver-id=1## 忽略表replicate-wild-ignore-table=mysql.*replicate-wild-ignore-table=sys.*log-bin=mysql-binbinlog_format=rowbinlog-do-db=admin4j

注意:binlog-do-db 根据自己的情况进行修改,指定具体要同步的数据库,如果不配置 则表示所有数据库均开启 Binlog

2.3 重启 MySQL 使配置生效

sudo systemctl restart mysqld

到/var/lib/mysql 目录下查看初始文件大小 154

在这里插入图片描述

2.4 测试 Binlog 是否开启

1)插入数据

INSERT INTO user_info VALUES('1001','zhangsan','male');

2)再次到/var/lib/mysql 目录下,查看 index 文件的大小

mysql 二进制订阅神器 alibaba 的 Canal

2.5 mysql创建canal用户

在 MySQL 中执行

mysql> set global validate_password_length=4;mysql> set global validate_password_policy=0;mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;mysql> FLUSH PRIVILEGES;

第 3 章 Canal 的下载和安装

3.1 下载并解压 Jar 包

  • 下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 1.1.5 版本为例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
  • 解压缩
mkdir /tmp/canaltar zxvf canal.deployer-$version.tar.gz  -C /tmp/canal

注意:canal 解压后是分散的,我们在指定解压目录的时候需要将 canal 指定上

  • 解压完成后,进入 /tmp/canal 目录,可以看到如下结构

mysql 二进制订阅神器 alibaba 的 Canal

3.2 修改 canal.properties 的配置

vi conf/canal.properties
## mysql serverIdcanal.instance.mysql.slaveId = 1234#position info,需要改成自己的数据库信息canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name =#canal.instance.standby.position = #canal.instance.standby.timestamp = #username/password,需要改成自己的数据库信息canal.instance.dbUsername = canal  canal.instance.dbPassword = canalcanal.instance.defaultDatabaseName =canal.instance.connectionCharset = UTF-8#table regexcanal.instance.filter.regex = .\*\\\\..\*

说明:这个文件是 canal 的基本通用配置,canal 端口号默认就是 11111,修改 canal 的 输出 model,默认 tcp,改为输出到 kafka

多实例配置如果创建多个实例,通过前面 canal 架构,我们可以知道,一个 canal 服务 中可以有多个 instance,conf/下的每一个 example 即是一个实例,每个实例下面都有独立的配置文件。

默认只有一个实例 example,如果需要多个实例处理不同的 MySQL 数据的话,直接拷贝出多个 example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改 canal.properties 中的 canal.destinations=实例 1,实例 2,实例 3。

mysql 二进制订阅神器 alibaba 的 Canal

3.3 修改 instance.properties

我们这里只读取一个 MySQL 数据,所以只有一个实例,这个实例的配置文件在 conf/example 目录下

-rwxrwxrwx 1 root root 2106 4月  19 2021 instance.properties[root@node13 example]# pwd/home/canal/conf/example[root@node13 example]# vi instance.properties
  • 1)配置 MySQL 服务器地址
 mysql serverId , v1.0.26+ will autoGen  canal.instance.mysql.slaveId=20# enable gtid use true/falsecanal.instance.gtidon=false# position info#position info,需要改成自己的数据库信息canal.instance.master.address=192.168.1.13:3306
  • 2)配置连接 MySQL 的用户名和密码,默认就是我们前面授权的 canal
# username/password,需要改成自己的数据库信息canal.instance.dbUsername=canalcanal.instance.dbPassword=canalcanal.instance.connectionCharset = UTF-8canal.instance.defaultDatabaseName =test# enable druid Decrypt database passwordcanal.instance.enableDruid=false
  • canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1

  • 如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false

    1. 启动
    sh bin/startup.sh
    1. 查看 instance 的日志
vi logs/canal/canal.log

mysql 二进制订阅神器 alibaba 的 Canal

    1. 查看 server 日志
vi logs/example/example.log

mysql 二进制订阅神器 alibaba 的 Canal

    1. 关闭
sh bin/stop.sh

3.4 其他方式

  • canal 的 docker 模式快速启动,参考:Docker QuickStart
  • canal 链接 Aliyun RDS for MySQL,参考:Aliyun RDS QuickStart
  • canal 消息投递给 kafka/RocketMQ,参考:Canal-Kafka-RocketMQ-QuickStart

第 4 章 实时监控测试

4.1 TCP 模式测试

新建工程,映入依赖

     com.alibaba.otter     canal.client     1.1.5       com.alibaba.otter     canal.protocol     1.1.5 

4.1.2 通用监视类 –CanalClient

1)Canal 封装的数据结构

mysql 二进制订阅神器 alibaba 的 Canal

  1. CanalClient(java 代码)
public class CanalClient {    public static void main(String[] args) throws InvalidProtocolBufferException { //1.获取 canal 连接对象 CanalConnector canalConnector =  CanalConnectors.newSingleConnector(new   InetSocketAddress("192.168.1.13", 11111), "example", "", ""); int batchSize = 100; //2.获取连接 canalConnector.connect(); //3.指定要监控的数据库 canalConnector.subscribe("ces_bi.*"); while (true) {     //4.获取 Message     Message message = canalConnector.getWithoutAck(batchSize);     long batchId = message.getId();     List entries = message.getEntries();     if (batchId == -1 || entries.size() <= 0) {  System.out.println("没有数据,休息一会");  try {      Thread.sleep(1000);  } catch (InterruptedException e) {      e.printStackTrace();  }     } else {  printEntry(message.getEntries());  canalConnector.ack(batchId); // 提交确认  // connector.rollback(batchId); // 处理失败, 回滚数据     } }    }    /     * 打印数据     *     * @param entries     */    private static void printEntry(List entries) throws InvalidProtocolBufferException { for (CanalEntry.Entry entry : entries) {     // 获取表名     String tableName = entry.getHeader().getTableName();     //  Entry 类型     CanalEntry.EntryType entryType = entry.getEntryType();     //  判断 entryType 是否为 ROWDATA     if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {  //  序列化数据  ByteString storeValue = entry.getStoreValue();  //  反序列化  CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);  // 获取事件类型  CanalEntry.EventType eventType = rowChange.getEventType();  // 获取具体的数据  List rowDatasList =   rowChange.getRowDatasList();  // 遍历并打印数据  for (CanalEntry.RowData rowData : rowDatasList) {      List beforeColumnsList =rowData.getBeforeColumnsList();      JSONObject beforeData = new JSONObject();      for (CanalEntry.Column column :beforeColumnsList) {   beforeData.put(column.getName(), column.getValue());      }      JSONObject afterData = new JSONObject();      List afterColumnsList =rowData.getAfterColumnsList();      for (CanalEntry.Column column :afterColumnsList) {   afterData.put(column.getName(),    column.getValue());      }      System.out.println("TableName:" + tableName+",EventType:" + eventType +",After:" + beforeData +",After:" + afterData);  }     } }    }}

4.1.3 测试 CanalClient

4.1.3.1 执行更新语句

UPDATE `ces_bi`.`user_info` SET `name` = 'lishi' WHERE `id` = '1001' AND `name` = 'zhangsan' AND `sex` = 'male' LIMIT 1

打印结果

TableName:user_info,EventType:UPDATE,After:{"sex":"male","name":"zhangsan","id":"1001"},After:{"sex":"male","name":"lishi","id":"1001"}

4.1.3.2 执行插入语句

INSERT INTO `ces_bi`.`user_info`(`id`, `name`, `sex`) VALUES ('1002', 'wangwu', 'f')

打印结果

TableName:user_info,EventType:INSERT,After:{},After:{"sex":"f","name":"wangwu","id":"1002"}

4.1.3.3 执行删除语句

DELETE FROM `ces_bi`.`user_info` WHERE `id` = '1002' AND `name` = 'wangwu' AND `sex` = 'f' LIMIT 1

打印结果

TableName:user_info,EventType:DELETE,After:{"sex":"f","name":"wangwu","id":"1002"},After:{}

4.1.3.4 官方例子

如果需要更详细的exmpale例子,请下载canal当前最新源码包,里面有个example工程,谢谢.

  • Simple客户端例子:SimpleCanalClientTest
  • Cluster客户端例子:ClusterCanalClientTest

4.2 Kafka 模式测试

1)修改 canal.properties 中 canal 的输出 model,默认 tcp,改为输出到 kafka

# ...# 可选项: tcp(默认), kafka,RocketMQ,rabbitmq,pulsarmqcanal.serverMode = kafka# ...# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)canal.mq.canalBatchSize = 50# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时canal.mq.canalGetTimeout = 100# 是否为flat json格式对象canal.mq.flatMessage = false

2)修改 Kafka 集群的地址

## MQ canal.mq.servers = a1:9092,a2:9092,a3:9092

3)修改 instance.properties 输出到 Kafka 的主题以及分区数

#  按需修改成自己的数据库信息#...canal.instance.master.address=192.168.1.20:3306# username/password,数据库的用户名和密码...canal.instance.dbUsername = canalcanal.instance.dbPassword = canal...# mq configcanal.mq.topic=canal_test# 针对库名或者表名发送动态topic#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*canal.mq.partition=0# hash partition config#canal.mq.partitionsNum=3#库名.表名: 唯一主键,多个表之间用逗号分隔#canal.mq.partitionHash=mytest.person:id,mytest.role:id#

注意:默认还是输出到指定 Kafka 主题的一个 kafka 分区,因为多个分区并行可能会打 乱 binlog 的顺序 , 如 果 要 提 高 并 行 度 , 首 先 设 置 kafka 的 分 区 数 >1, 然 后 设 置 canal.mq.partitionHash 属性

4)重启 Canal

5)启动 Kafka 消费客户端测试,查看消费情况

in/kafka-console-consumer.sh --bootstrap-server a1:9092 --topic canal_test

6)向 MySQL 中插入数据后查看消费者控制台

# 插入数据INSERT INTO user_info VALUES('1001','zhangsan','male'),('1002','lisi','female');

7) Kafka 消费者控制台

{    "data":[ {     "id":"1001",     "name":"zhangsan",     "sex":"male" }, {     "id":"1002",     "name":"lisi",     "sex":"female" }    ],    "database":"admin4j",    "es":1639360729000,    "id":1,    "isDdl":false,    "mysqlType":{ "id":"varchar(255)", "name":"varchar(255)", "sex":"varchar(255)"    },    "old":null,    "sql":"",    "sqlType":{ "id":12, "name":12, "sex":12    },    "table":"user_info",    "ts":1639361038454,    "type":"INSERT"}

第6章 canal HA机制设计

canal的ha(双机集群)分为两部分,canal server和canal client分别有对应的ha实现:
●canal server:为了减少对mysql dump的请求,不同server上的实例(instance)要求同一时间只能有一个处于running,其他的处于standby状态。
●canal client:为了保证有序性,一份实例(instance)同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。
整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定),后续我会继续记录zookeeper学习过程。

Canal Server HA架构原理:

mysql 二进制订阅神器 alibaba 的 Canal
流程步骤

  • 1.canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断(实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)。
  • 2.创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态。
  • 3.一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance。
  • 4.canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect。
    注:canal client的方式和canal server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制。

第6章 Canal-HA模式搭建

6.1 环境准备

(1)启动zookeeper

(2)启动kafka

Canal架构

mysql 二进制订阅神器 alibaba 的 Canal

下载安装 https://github.com/alibaba/canal/releases

6.2 canal-server

在a1和a2上安装canal-server

在两台节点机器都安装部署:

 mkdir -p /opt/module/canal-server tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/module/canal-server/

在a1和上的canal-server上各配置如下:

1)添加zookeeper地址,将file-instance.xml注释,解开default-instance.xml注释。修改两个canal-server节点的配置

vim conf/canal.properties  ##默认端口号为1111,可以修改  canal.zkServers =a1:2181,a2:2181,hadoop103:2181  #canal.instance.global.spring.xml = classpath:spring/file-instance.xml  canal.instance.global.spring.xml = classpath:spring/default-instance.xml

2)在进入conf/example目录修改instance配置,instance.properties是针对要追踪的mysql的实例配置

[kris@a1 canal-server] vim conf/example/instance.properties    //a1机器配置  canal.instance.mysql.slaveId = 100   //不能与mysql的server-id重复  canal.instance.master.address = a1:3306[kris@a2 canal-server] vim conf/example/instance.properties   //a2机器的配置  canal.instance.mysql.slaveId = 101  canal.instance.master.address = a1:3306

3)启动两台canal-server

[kris@a1 canal-server]$ sh bin/startup.sh [kris@a2 canal-server]$ sh bin/startup.sh 

查看日志,无报错便启动成功,一台节点有日志,一台节无日志

tail -f logs/example/example.log

通过zookeeper查看当前工作节点,可以看出当前活动节点为101

[kris@a1 canal-server]$ /opt/module/zookeeper-3.4.10/bin/zkCli.sh   [zk: localhost:2181(CONNECTED) 0] ls /  [kafa_2.11, zookeeper, yarn-leader-election, hadoop-ha, otter, rmstore]  [zk: localhost:2181(CONNECTED) 1] get /otter/canal/destinations/example/running    {"active":true,"address":"192.168.1.101:11111"}

关闭canal

[kris@a1 canal-server]$ sh bin/stop.sh [kris@a2 canal-server]$ sh bin/stop.sh

6.3 对接Kafka

canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:

  • kafka: https://github.com/apache/kafka
  • RocketMQ : https://github.com/apache/rocketmq

(1)修改instance配置文件 (两台节点)

[kris@a1 canal-server]$ vim conf/example/instance.properties  canal.mq.topic=test  ##将数据发送到指定的topic  canal.mq.partition=0

(2)修改canal.properties(两台节点)

[kris@a1 canal-server]$ vim conf/canal.propertiescanal.serverMode = kafkacanal.mq.servers = a1:9092,a2:9092,a3:9092canal.mq.retries = 0canal.mq.batchSize = 16384canal.mq.maxRequestSize = 1048576canal.mq.lingerMs = 100canal.mq.bufferMemory = 33554432canal.mq.canalBatchSize = 50canal.mq.canalGetTimeout = 100canal.mq.flatMessage = truecanal.mq.compressionType = nonecanal.mq.acks = all#canal.mq.properties. =canal.mq.producerGroup = test

(3)启动canal

[kris@a1 canal-server]$ sh bin/startup.sh[kris@a2 canal-server]$ sh bin/startup.sh

(4)启动kafka消费者监听topic test

[kris@hadoop103 ~]$ /opt/module/kafka/bin/kafka-console-consumer.sh --bootstrap-server a1:9092 --topic test

(5)向MySql数据库插入条数据 测试

插入: id = 4,name = smile,sex = femal

修改: id = 4,name = smile,sex = male

删除:id = 4的这条数据。

Kafka中接收到的数据格式如下:

[kris@a1 ~]$ /opt/module/kafka/bin/kafka-console-consumer.sh --bootstrap-server a1:9092 --topic test
{"data":[{"id":"4","name":"smile","sex":"female"}],    "database":"company","es":1598197042000,"id":1,"isDdl":false,    "mysqlType":{"id":"int(4)","name":"varchar(255)","sex":"varchar(255)"},    "old":null,"pkNames":["id"],"sql":"",    "sqlType":{"id":4,"name":12,"sex":12},    "table":"staff","ts":1598197042255,    "type":"INSERT"}
{"data":[{"id":"4","name":"smile","sex":"male"}],    "database":"company","es":1598197317000,"id":2,"isDdl":false,    "mysqlType":{"id":"int(4)","name":"varchar(255)","sex":"varchar(255)"},    "old":[{"sex":"female"}],    "pkNames":["id"],"sql":"",    "sqlType":{"id":4,"name":12,"sex":12},    "table":"staff","ts":1598197318118,    "type":"UPDATE"}
{"data":[{"id":"4","name":"smile","sex":"male"}],    "database":"company","es":1598197330000,"id":3,"isDdl":false,    "mysqlType":{"id":"int(4)","name":"varchar(255)","sex":"varchar(255)"},    "old":null,"pkNames":["id"],"sql":"",    "sqlType":{"id":4,"name":12,"sex":12},    "table":"staff","ts":1598197330144,    "type":"DELETE"}

6.4 canal-admin安装

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作

6.4.1 准备

canal-admin的限定依赖:

  1. MySQL,用于存储配置和节点等相关数据
  2. canal版本,要求>=1.1.5 (需要依赖canal-server提供面向admin的动态运维管理接口)

6.4.2 部署

docker 运行

docker pull canal/canal-admin:v1.1.5
docker run -itd -p 8089:8089 -e server.port=8089 \  -e spring.datasource.address=xxx \  -e spring.datasource.database=xx \  -e spring.datasource.username=xx   -e spring.datasource.password=xx canal/canal-admin:v1.1.5
  1. 下载 canal-admin, 访问 release 页面 , 选择需要的包下载, 如以 1.1.5 版本为例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz
  1. 解压缩
mkdir /tmp/canal-admintar zxvf canal.admin-$version.tar.gz  -C /tmp/canal-admin
  1. 配置修改
vi conf/application.yml
server:  port: 8089spring:  jackson:    date-format: yyyy-MM-dd HH:mm:ss    time-zone: GMT+8spring.datasource:  address: 127.0.0.1:3306  database: canal_manager  username: canal  password: canal  driver-class-name: com.mysql.jdbc.Driver  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false  hikari:    maximum-pool-size: 30    minimum-idle: 1canal:  adminUser: admin  adminPasswd: admin
  1. 初始化元数据库
mysql -h127.1 -uroot -p# 导入初始化SQL> source conf/canal_manager.sql

a. 初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化

b. canal_manager.sql默认会在conf目录下,也可以通过链接下载 canal_manager.sql

  1. 启动
sh bin/startup.sh

此时代表canal-admin已经启动成功,可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456

mysql 二进制订阅神器 alibaba 的 Canal

  1. canal-server端配置

使用canal_local.properties的配置覆盖canal.properties

# register ipcanal.register.ip =# canal admin configcanal.admin.manager = 127.0.0.1:8089canal.admin.port = 11110canal.admin.user = admincanal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441# admin auto registercanal.admin.register.auto = truecanal.admin.register.cluster =

启动admin-server即可。

或在启动命令中使用参数:sh bin/startup.sh local 指定配置

第7章 Canal-admin使用

7.1 配置启动

(1)修改配置填写Canal-damin地址 (两台节点都改)[kris@a1 canal-server]$ vim conf/canal.propertiescanal.admin.manager = a1:8089# admin auto registercanal.admin.register.auto = truecanal.admin.register.cluster =(2)启动Canal-admin[kris@a1 canal-admin]$ sh bin/startup.sh(3)启动Canal-server[kris@a1 canal-server]$ sh bin/startup.sh [kris@a2 canal-server]$ sh bin/startup.sh (4)登录页面查看 a1:8089

7.2 集群运维

mysql 二进制订阅神器 alibaba 的 Canal

点击主配置后可以对canal.properies设计出一份全局的配置集群共享,可以创建多个集群

7.3 Server运维

配置项:

  • 所属集群,可以选择为单机 或者 集群。一般单机Server的模式主要用于一次性的任务或者测试任务
  • Server名称,唯一即可,方便自己记忆
  • Server Ip,机器ip
  • admin端口,canal 1.1.5版本新增的能力,会在canal-server上提供远程管理操作,默认值11110
  • tcp端口,canal提供netty数据订阅服务的端口
  • metric端口, promethues的exporter监控数据端口 (未来会对接监控)

mysql 二进制订阅神器 alibaba 的 Canal

mysql 二进制订阅神器 alibaba 的 Canal

mysql 二进制订阅神器 alibaba 的 Canal

7.4 Instance运维

创建instance

mysql 二进制订阅神器 alibaba 的 Canal

mysql 二进制订阅神器 alibaba 的 Canal

mysql 二进制订阅神器 alibaba 的 Canal

Canal和Zookeeper对应节点的关系/otter/canal:canal的根目录/otter/canal/cluster:整个canal server的集群列表/otter/canal/destinations:destination的根目录/otter/canal/destinations/example/running:服务端当前正在提供服务的running节点/otter/canal/destinations/example/cluster:针对某个destination的工作集群列表/otter/canal/destinations/example/1001/running:客户端当前正在读取的running节点/otter/canal/destinations/example/1001/cluster:针对某个destination的客户端列表/otter/canal/destinations/example/1001/cursor:客户端读取的position信息