如何保证 Kafka 数据实时同步到 Elasticsearch?_es连接kafka
Kafka 数据实时同步到 Elasticsearch数据同步
- 核心配置文件 (需要创建新文件)
name=elasticsearch-sinkconnector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnectortasks.max=3topics=target_topickey.ignore=trueconnection.url=http://localhost:9200type.name=_docvalue.converter=org.apache.confluent.connect.json.JsonSchemaConvertervalue.converter.schema.registry.url=http://localhost:8081schema.ignore=true
- 启动命令
.\\bin\\windows\\connect-standalone.bat .\\config\\connect-standalone.properties .\\config\\es-sink.properties
关键优化参数说明:
batch.size=2000
:控制批量写入ES的文档数量max.in.flight.requests=5
:提升写入吞吐量flush.timeout.ms=30000
:设置刷新超时时间retry.backoff.ms=5000
:失败重试间隔max.retries=10
:最大重试次数
监控指标:
// 在Kafka Connect配置中添加监控config.put(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG, 10000);config.put(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG, 10000);
异常处理方案:
- 配置死信队列(DLQ)处理失败记录:
errors.tolerance=allerrors.deadletterqueue.topic.name=dlq-topicerrors.deadletterqueue.context.headers.enable=true
- 实现重试策略:
RetryUtil.executeWithRetry( () -> client.bulk(request), 3, Duration.ofSeconds(2), Arrays.asList(ElasticsearchTimeoutException.class));
kafka数据转换和清洗到Elasticsearch
1. 核心配置文件(新增)
# 连接器基础配置name=es-sink-connector # 连接器实例名称connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnectortasks.max=3 # 并行任务数(根据ES集群规模调整)topics=user_behavior # 消费的Kafka主题名称# Elasticsearch配置connection.url=http://localhost:9200 # ES集群地址type.name=_doc # 文档类型(ES7+固定值)behavior.on.null.values=ignore # 空值处理策略# 数据转换配置value.converter=com.example.EsRecordConverter # 自定义转换器(对应下方Java代码)value.converter.schemas.enable=false # 禁用schema验证# 清洗配置SMT链transforms=FilterInvalid,FormatField,AddTimestamptransforms.FilterInvalid.type=org.apache.kafka.connect.transforms.Filter$Valuetransforms.FilterInvalid.predicate=HasUserId # 过滤无用户ID的记录transforms.FilterInvalid.negate=truetransforms.FormatField.type=org.apache.kafka.connect.transforms.ReplaceField$Value transforms.FormatField.renames=userId:user_id,ipAddr:client_ip # 字段重命名transforms.AddTimestamp.type=org.apache.kafka.connect.transforms.InsertField$Valuetransforms.AddTimestamp.timestamp.field=event_time # 添加处理时间戳# 异常处理配置errors.tolerance=all # 错误容忍模式errors.deadletterqueue.topic.name=es_sink_dlq # 死信队列名称errors.deadletterqueue.context.headers.enable=true # 保留错误上下文# 性能优化batch.size=2000 # 批量写入条数flush.timeout.ms=30000 # 刷新超时时间max.in.flight.requests=5 # 并发请求数
2. 自定义转换器代码(新增)
/** * 自定义数据转换器(对应配置中的value.converter) */public class EsRecordConverter implements Converter { private static final Logger LOG = LoggerFactory.getLogger(EsRecordConverter.class); // 数据转换入口方法 @Override public SchemaAndValue toConnectData(String topic, byte[] value) { try { JSONObject json = new JSONObject(new String(value, \"UTF-8\")); // 数据清洗:IP地址标准化 if(json.has(\"client_ip\")) { String ip = json.getString(\"client_ip\"); json.put(\"client_ip\", ip.replace(\" \", \"\")); // 去除空格 } // 添加清洗标记 json.put(\"clean_flag\", calculateCleanFlag(json)); return new SchemaAndValue(null, json.toString()); } catch (Exception e) { LOG.error(\"Data conversion failed\", e); throw new DataException(\"Conversion error\", e); } } // 生成数据清洗哈希值 private String calculateCleanFlag(JSONObject data) { String rawData = data.toString(); return DigestUtils.sha256Hex(rawData); }}
3. Elasticsearch预处理管道(命令行执行)
# 创建数据预处理管道curl -X PUT \"localhost:9200/_ingest/pipeline/kafka_pipeline\" -H \'Content-Type: application/json\' -d\'{ \"description\": \"Data final cleaning\", \"processors\": [ { \"remove\": { # 移除调试字段 \"field\": [\"debug_info\", \"temp_field\"], \"ignore_missing\": true } }, { \"date\": { # 时间格式标准化 \"field\": \"event_time\", \"target_field\": \"@timestamp\", \"formats\": [\"UNIX_MS\"] } } ]}\'
4. 启动命令(Windows环境)
:: 启动独立模式连接器.\\bin\\windows\\connect-standalone.bat ^ # 主启动脚本.\\config\\connect-standalone.properties ^ # 通用配置.\\config\\es-sink.properties # 当前连接器配置:: 参数说明::: 1. connect-standalone.properties - Kafka Connect基础配置:: 2. es-sink.properties - 当前连接器专属配置
- 数据验证命令
# 查看ES中的清洗后数据curl -X GET \"localhost:9200/user_behavior/_search?pretty\" -H \'Content-Type: application/json\' -d\'{ \"query\": { \"term\": { \"clean_flag\": \"d4e5f6...\" # 替换实际哈希值 } }}\'# 检查死信队列(需配置kafka-cli).\\bin\\windows\\kafka-console-consumer.bat ^--bootstrap-server localhost:9092 ^--topic es_sink_dlq ^--from-beginning
方案特点:
-
三层清洗架构:
- SMT层:基础格式处理
- 转换器层:业务逻辑清洗
- ES管道层:存储前终检
-
追踪机制:
- 通过clean_flag字段实现数据溯源
- 死信队列保留原始错误数据
-
性能平衡:
- 批量大小与内存占用的最佳实践
- 重试策略:10次指数退避重试(max.retries=10)