> 技术文档 > Kafka使用Elasticsearch Service Sink Connector直接传输topic数据到Elasticsearch_kafka-connect-elasticsearch的使用

Kafka使用Elasticsearch Service Sink Connector直接传输topic数据到Elasticsearch_kafka-connect-elasticsearch的使用

链接:Elasticsearch Service Sink Connector for Confluent Platform | Confluent Documentation

链接:Apache Kafka

一、搭建测试环境

下载Elasticsearch Service Sink Connector

https://file.zjwlyy.cn/confluentinc-kafka-connect-elasticsearch-15.0.0.zip

为了方便,使用docker搭建kafka和elasticsearch。

docker run -d --name elasticsearch   -e \"discovery.type=single-node\"   -e ES_JAVA_OPTS=\"-Xms512m -Xmx512m\"   -p 9200:9200 -p 9300:9300   docker.elastic.co/elasticsearch/elasticsearch:7.17.1

docker run --user root -d --name kafka -p 9092:9092 -p 8083:8083 apache/kafka:3.9.1

confluentinc-kafka-connect-elasticsearch-15.0.0.zip文件复制到kafka容器里

docker cp confluentinc-kafka-connect-elasticsearch-15.0.0.zip kafka:/opt/connectors   

进入kafka的容器

docker exec -it kafka /bin/bash

修改配置文件

vi /opt/kafka/config/connect-standalone.propertiesplugin.path=/opt/connectors #修改为zip解压路径

解压zip

unzip confluentinc-kafka-connect-elasticsearch-15.0.0.zip

修改配置文件

vi /opt/connectors/confluentinc-kafka-connect-elasticsearch-15.0.0/etc/quickstart-elasticsearch.properties# 基础配置name=t-elasticsearch-sinkconnector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnectortasks.max=3 # 根据分区数调整topics=t-elasticsearch-sinkkey.ignore=truekey.converter=org.apache.kafka.connect.storage.StringConvertervalue.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=false# ES连接配置connection.url=http://192.168.1.1:9200 # 多节点负载均衡type.name=_docindex.name=t-elasticsearch-sink#index.auto.create=true # 自动创建索引(或手动预创建)schema.ignore=true# 容错与错误处理errors.tolerance=all#errors.deadletterqueue.topic.name=dlq_t4_elasticsearch # 必须配置DLQ#errors.deadletterqueue.context.headers.enable=true # 保留错误上下文behavior.on.null.values=IGNORE # 跳过空值消息# 性能优化batch.size=2000 # 批量写入提升吞吐max.in.flight.requests=5 # 并发请求数max.retries=10 # 失败重试次数retry.backoff.ms=5000 # 重试间隔read.timeout.ms=10000 # 读超时connection.timeout.ms=10000 # 连接超时flush.timeout.ms=30000 # 刷新超时[2](@ref)

实验性配置,暂未验证(根据日期来创建ES索引)

Kafka Elasticsearch Sink Connector and the Power of Single Message Transformations | sap1ens.com

transforms=timestampRoutertransforms.timestampRouter.type=org.apache.kafka.connect.transforms.TimestampRoutertransforms.timestampRouter.topic.format=${topic}-${timestamp}transforms.timestampRouter.timestamp.format=yyyy-MM-dd

启动Connector

#cd /opt/kafka/bin

#./connect-standalone.sh -daemon ../config/connect-standalone.properties /opt/connectors/confluentinc-kafka-connect-elasticsearch-15.0.0/etc/quickstart-elasticsearch.properties

二、查看Connector状态

curl -XGET http://localhost:8083/connectors/t-elasticsearch-sink/status  #查看状态

curl -XGET http://localhost:8083/connectors/t-elasticsearch-sink/config   #查看配置

curl -X DELETE http://localhost:8083/connectors/t-elasticsearch-sink/offsets  #清理偏移量

curl -X DELETE http://localhost:8083/connectors/t-elasticsearch-sink   #删除此connectors

三、测试写入

./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list   #查看topics

./kafka-topics.sh --delete --topic t-elasticsearch-sink    #删除topic

./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic t-elasticsearch-sink  #逐行写入消息 

四、查看ES索引

curl http://127.0.0.1:9200/_cat/indices?v