> 技术文档 > MySQL数据实时同步至Elasticsearch的高效方案:Java实现+源码解析,一文搞定!_log4j与elasticsearch同步

MySQL数据实时同步至Elasticsearch的高效方案:Java实现+源码解析,一文搞定!_log4j与elasticsearch同步


引言:为什么需要实时同步?

MySQL擅长事务处理,而Elasticsearch(ES)则专注于搜索与分析。将MySQL数据实时同步到ES,可以充分发挥两者的优势,例如:

  • 构建高性能搜索服务

  • 实时数据分析与大屏展示

  • 提升复杂查询效率

传统方案(如定时全量同步)存在延迟高、资源浪费等问题。本文将基于MySQL Binlog监听实现毫秒级实时同步,并提供完整Java代码及深度源码解析。

一、技术选型与核心原理

1.1 核心组件
  • MySQL Binlog:MySQL的二进制日志,记录所有数据变更事件(增删改)。

  • Canal/OpenReplicator:解析Binlog的工具(本文使用轻量级mysql-binlog-connector-java)。

  • Elasticsearch High Level REST Client:ES官方Java客户端,用于数据写入。

1.2 架构流程图
MySQL Server → Binlog → Java监听程序 → 数据转换 → Elasticsearch

二、环境准备与配置

2.1 MySQL开启Binlog
# 修改my.cnf(Linux)或my.ini(Windows)[mysqld]server_id=1log_bin=mysql-binbinlog_format=ROW # 必须为ROW模式
2.2 创建ES索引
PUT /user{ \"mappings\": { \"properties\": { \"id\": {\"type\": \"integer\"}, \"name\": {\"type\": \"text\"}, \"email\": {\"type\": \"keyword\"}, \"create_time\": {\"type\": \"date\"} } }}

三、Java代码实现

3.1 Maven依赖
 com.github.shyiko mysql-binlog-connector-java 0.25.4 org.elasticsearch.client elasticsearch-rest-high-level-client 7.17.3
3.2 核心代码(Binlog监听与同步)
import com.github.shyiko.mysql.binlog.BinaryLogClient;import com.github.shyiko.mysql.binlog.event.*;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.xcontent.XContentType;public class MySQL2ESSyncer { private static final String ES_INDEX = \"user\"; public static void main(String[] args) throws Exception { // 初始化ES客户端 RestHighLevelClient esClient = ESClientFactory.createClient(); // 配置Binlog监听 BinaryLogClient client = new BinaryLogClient(\"localhost\", 3306, \"root\", \"password\"); client.setServerId(1001); // 唯一ID,避免冲突 client.registerEventListener(event -> { EventData data = event.getData(); if (data instanceof WriteRowsEventData) { // 处理插入事件 handleWriteEvent((WriteRowsEventData) data, esClient); } else if (data instanceof UpdateRowsEventData) { // 处理更新事件 handleUpdateEvent((UpdateRowsEventData) data, esClient); } else if (data instanceof DeleteRowsEventData) { // 处理删除事件 handleDeleteEvent((DeleteRowsEventData) data, esClient); } }); client.connect(); // 启动监听 } private static void handleWriteEvent(WriteRowsEventData eventData, RestHighLevelClient esClient) { eventData.getRows().forEach(row -> { // 假设表结构为:id, name, email, create_time String json = String.format( \"{\\\"id\\\":%d,\\\"name\\\":\\\"%s\\\",\\\"email\\\":\\\"%s\\\",\\\"create_time\\\":\\\"%s\\\"}\", row[0], row[1], row[2], row[3] ); IndexRequest request = new IndexRequest(ES_INDEX) .id(row[0].toString()) .source(json, XContentType.JSON); esClient.index(request, RequestOptions.DEFAULT); }); } // 更新和删除处理类似,代码略(完整源码见文末链接)}

四、源码深度解析

4.1 Binlog监听流程
  • BinaryLogClient:核心类,负责连接MySQL并监听Binlog。

  • 事件类型判断:根据WriteRowsEventDataUpdateRowsEventDataDeleteRowsEventData区分增、改、删操作。

4.2 数据转换关键点
  • Row数据解析:从事件中提取变更的行的具体值,需与表结构顺序对应。

  • ES文档ID:建议使用MySQL主键,确保更新/删除操作能精准定位文档。

4.3 异常处理与优化
  • 重试机制:ES写入失败时,可加入重试队列。

  • 批量提交:攒批写入ES提升性能(需权衡实时性)。

  • 事务一致性:确保Binlog位置持久化,避免数据丢失。

五、方案优缺点对比

方案 实时性 复杂度 资源消耗 定时全量同步 低(分钟级) 低 高 基于触发器 高 高(需改表) 中 Binlog监听

六、总结与扩展

本文实现了基于Binlog的MySQL到ES的实时同步,具备以下优势:

  • 实时性:毫秒级延迟,满足大部分业务场景。

  • 无侵入:无需修改MySQL表结构。

  • 可扩展:可轻松适配其他数据源(如PostgreSQL)。

扩展方向

  • 使用Kafka作为中间层,解耦生产与消费。

  • 增加监控报警,保障数据一致性。

  • 支持DDL变更自动同步(如表结构修改)。

 

地坪展览