> 文档中心 > springboot整合canal(超详细)

springboot整合canal(超详细)


一、canal介绍

binlog是mysql的二进制日志,对于操作数据库的语句,都以此形式保存。Canal是阿里MySQL数据库Binlog的增量订阅&消费组件 。基于数据库Binlog可以监控数据库数据的变化进而用于数据同步等业务。

二、服务端部署

服务端链接: https://github.com/alibaba/canal/releases
解压zip,目录如下:
springboot整合canal(超详细)
conf -> example -> instance.properties
日志文件名称和记录位置,如下图所示,修改数据库连接地址、日志文件、连接的用户名和密码

show master status(数据库查询日志文件命令)
(详细配置可以从官网查看,仅记录使用步骤)
在这里插入图片描述

三、客户端使用

1、POM文件

 <!--canal--> <dependency>     <groupId>com.alibaba.otter</groupId>     <artifactId>canal.client</artifactId>     <version>1.1.3</version> </dependency>

2、连接配置

canal-monitor-mysql:  hostname: localhost  port: 11111  tableName: fas4.0

具体的数据库数据变化 业务实现方面需要 自己手动去实现,仅展示自己使用的部分。
需要注意: 如果是多个客户端同时使用,要注意:多个客户端会出现某个客户端 把消息全部消费,而别的客户端没有消息消费的情况,这里需要特别注意

import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry;import com.haiot.service.CorpsUploadService;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Value;import org.springframework.scheduling.annotation.Async;import org.springframework.stereotype.Component;import javax.annotation.Resource;import java.net.InetSocketAddress;import java.util.List;import com.alibaba.otter.canal.protocol.Message;/** * @program: fas-haiot-interface * @description: 市级平台数据上传接口相关实现 * @author: liuAnmin * @create: 2021-03-22 15:52 **/@Component@Slf4jpublic class CanalUtil {    @Resource    CorpsUploadService corpsUploadService;    @Value("${canal-monitor-mysql.hostname}")    String canalMonitorHost;    @Value("${canal-monitor-mysql.port}")    Integer canalMonitorPort;    @Value("${canal-monitor-mysql.tableName}")    String canalMonitorTableName;    private final static int BATCH_SIZE = 10000;    /**     * 启动服务     */    @Async("TaskPool")    public void startMonitorSQL() { while (true) {     CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort), "example", "", "");     try {  //打开连接  connector.connect();  log.info("数据库检测连接成功!" + canalMonitorTableName);  //订阅数据库表,全部表q  connector.subscribe(canalMonitorTableName + "\\..*");  //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿  connector.rollback();  while (true) {      // 获取指定数量的数据      Message message = connector.getWithoutAck(BATCH_SIZE);      long batchId = message.getId();      int size = message.getEntries().size();      if (batchId == -1 || size == 0) {      } else {   handleDATAChange(message.getEntries());      }      // 提交确认      connector.ack(batchId);  }     } catch (Exception e) {  e.printStackTrace();  log.error("成功断开监测连接!尝试重连");     } finally {  connector.disconnect();  //防止频繁访问数据库链接: 线程睡眠 10秒  try {      Thread.sleep(10 * 1000);  } catch (InterruptedException e) {      e.printStackTrace();  }     } }    }    /**     * 打印canal server解析binlog获得的实体类信息     */    private void handleDATAChange(List<CanalEntry.Entry> entrys) { for (CanalEntry.Entry entry : entrys) {     if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {  continue;     }     //RowChange对象,包含了一行数据变化的所有特征     CanalEntry.RowChange rowChage;     try {  rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());     } catch (Exception e) {  throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);     }     CanalEntry.EventType eventType = rowChage.getEventType();     log.info("Canal监测到更新:【{}】", entry.getHeader().getTableName());     switch (eventType) {  /**   * 删除操作   */  case DELETE:      corpsUploadService.DeleteOperateToCityInterface(rowChage, entry);      break;  /**   * 添加操作   */  case INSERT:      corpsUploadService.InsertOperateToCityInterface(rowChage, entry);      break;  /**   * 更新操作   */  case UPDATE:      corpsUploadService.UpdateOperateToCityInterface(rowChage, entry);      break;  default:      break;     } }    }}