k8s 配置 Kafka SASL_SSL双重认证_kafka sasl配置
说明
kafka
提供了多种安全认证机制,主要分为SASL
和SSL
两大类。
SASL: 是一种身份验证机制,用于在客户端和服务器之间进行身份验证的过程,其中SASL/PLAIN是基于账号密码的认证方式。
SSL: 是一种加密协议,用于在网络通信中提供数据的保密性和完整性。它使用公钥和私钥来建立安全的连接,并对传输的数据进行加密和解密,以防止未经授权的访问和篡改。
在 Kafka
中启用 SASL_SSL
安全协议时,SASL
用于客户端和服务器之间的身份验证,SSL
则用于加密和保护数据的传输。不仅提供身份验证,还提供加密和数据保护的功能。
要在 Kubernetes 中为 Kafka 配置 SASL_SSL,你需要先完成以下两大步骤:
第一部分:生成 SSL 证书用于 SASL_SSL(JKS 格式)
Kafka 使用 Java 的 Keystore/Truststore(.jks
文件)作为证书格式。
步骤 1:创建 CA 私钥和证书(自签名)
# CA 私钥openssl genrsa -out ca-key.pem 2048# CA 证书openssl req -x509 -new -key ca-key.pem -days 3650 -out ca-cert.pem -subj \"/CN=Kafka-CA\"
步骤 2:创建 Kafka 服务器的证书请求并签发
# 生成 Kafka 服务私钥openssl genrsa -out kafka-key.pem 2048# 创建证书请求openssl req -new -key kafka-key.pem -out kafka.csr -subj \"/CN=localhost\"# 使用 CA 签发openssl x509 -req -in kafka.csr -CA ca-cert.pem -CAkey ca-key.pem -CAcreateserial -out kafka-signed.crt -days 3650
步骤 3:生成 JKS(Java keystore)
# 转换成 PKCS12openssl pkcs12 -export \\ -in kafka-signed.crt \\ -inkey kafka-key.pem \\ -out kafka-keystore.p12 \\ -name kafka \\ -CAfile ca-cert.pem \\ -caname root \\ -password pass:123456# 导入为 keystore.jkskeytool -importkeystore \\ -deststorepass 123456 -destkeypass 123456 \\ -destkeystore kafka.keystore.jks \\ -srckeystore kafka-keystore.p12 -srcstoretype PKCS12 \\ -srcstorepass 123456 \\ -alias kafka
步骤 4: 创建 truststore.jks(包含 CA)
keytool -import \\ -trustcacerts \\ -alias CARoot \\ -file ca-cert.pem \\ -keystore kafka.truststore.jks \\ -storepass 123456 \\ -noprompt
附:客户端证书生成(如需双向认证)
# 客户端私钥openssl genrsa -out client-key.pem 2048# 客户端证书请求openssl req -new -key client-key.pem -out client.csr -subj \"/CN=kafka-client\"# CA 签发客户端证书openssl x509 -req -in client.csr -CA ca-cert.pem -CAkey ca-key.pem -CAcreateserial -out client-cert.pem -days 3650
✅ 生成完成的文件:
kafka.keystore.jks
:服务端使用(含私钥)kafka.truststore.jks
:客户端和服务端都使用(信任CA)- 密码:统一用
123456
(你可以自定义)
✅ 第二部分:Kafka 中配置 SASL_SSL
将上述文件配置到 Kafka 中(假设你在 Kubernetes 中运行):
🗂 配置 1:Kafka 环境变量(在 StatefulSet / Deployment 中)
env: # --- 控制器配置(KRaft 模式) --- - name: KAFKA_CFG_NODE_ID value: \"0\" # 当前节点的唯一 ID(用于 controller quorum) - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES value: CONTROLLER # 控制器监听使用的 listener 名称,需与 KAFKA_CFG_LISTENERS 中一致 - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTER_CLIENT_QUOTA_WINDOW_NUM value: \"10\" # 控制器选举客户端配额窗口数量(流控相关) - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTER_CLIENT_QUOTA_WINDOW_SIZE_SECONDS value: \"1\" # 每个窗口的时长,单位秒 - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTER_REQUEST_TIMEOUT_MS value: \"5000\" # 控制器之间选举通信的请求超时时间(毫秒) - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS value: \"0@kafka:9093\" # controller 选举配置:nodeId@host:port # --- Kafka 常规配置 --- - name: KAFKA_AUTO_CREATE_TOPICS_ENABLE value: \"true\" # 启用自动创建 topic(生产建议关闭) - name: KAFKA_ENABLE_KRAFT value: \"YES\" # 启用 KRaft 模式(即不使用 ZooKeeper) # --- 节点角色定义 --- - name: KAFKA_CFG_PROCESS_ROLES value: \"broker,controller\" # 当前节点同时担任 broker 和 controller 角色 # --- Listener 与协议映射 --- - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP value: \"CONTROLLER:SASL_PLAINTEXT,PLAINTEXT:SASL_SSL\" # 每个 listener 使用的安全协议 # CONTROLLER 使用 SASL_PLAINTEXT,用于控制器通信 # PLAINTEXT 实际绑定 SASL_SSL,用于 broker/client 通信(命名不影响协议) - name: KAFKA_INTER_BROKER_LISTENER_NAME value: PLAINTEXT # Kafka Broker 间通信使用的 listener 名称(上方定义) - name: KAFKA_CFG_LISTENERS value: PLAINTEXT://:9092,CONTROLLER://:9093 # Broker 和 Controller 的监听端口及协议标识 - name: KAFKA_CFG_ADVERTISED_LISTENERS value: PLAINTEXT://192.168.1.5:9092 # Kafka 向客户端暴露的访问地址 # --- SSL 配置 --- - name: KAFKA_SSL_KEYSTORE_LOCATION value: /bitnami/kafka/config/certs/kafka.keystore.jks # 服务端密钥 + 证书文件路径 - name: KAFKA_SSL_KEYSTORE_PASSWORD value: 123456 # keystore 文件访问密码 - name: KAFKA_SSL_KEY_PASSWORD value: 123456 # keystore 内私钥使用的密码 - name: KAFKA_SSL_TRUSTSTORE_LOCATION value: /bitnami/kafka/config/certs/kafka.truststore.jks # CA 证书文件路径(信任的客户端) - name: KAFKA_SSL_TRUSTSTORE_PASSWORD value: 123456 # truststore 文件访问密码 - name: KAFKA_SSL_CLIENT_AUTH value: required # 启用客户端证书校验(双向认证) # --- SASL 配置(PLAIN 机制)--- - name: KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL value: PLAIN # 控制器间通信使用 PLAIN SASL 机制 - name: KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL value: PLAIN # Broker 间通信使用 PLAIN SASL 机制 # --- TLS 类型与证书密码 --- - name: KAFKA_TLS_TYPE value: JKS # TLS 密钥文件类型(Java KeyStore) - name: KAFKA_CERTIFICATE_PASSWORD value: 123456# 证书统一使用的访问密码(用于 SSL 参数) # --- 监听器角色映射 --- - name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME value: SASL_SSL # Broker 间通信使用的安全 listener 名称 - name: KAFKA_CLIENT_LISTENER_NAME value: SASL_SSL # 客户端连接 Kafka 使用的 listener 名称 # --- SASL 用户配置 --- - name: KAFKA_CONTROLLER_USER value: kafka # 控制器之间通信使用的用户名 - name: KAFKA_CONTROLLER_PASSWORD value: kafka123 # 控制器之间通信使用的密码 - name: KAFKA_INTER_BROKER_USER value: kafka # Broker 间通信使用的用户名 - name: KAFKA_INTER_BROKER_PASSWORD value: kafka123 # Broker 间通信使用的密码 - name: KAFKA_CLIENT_USERS value: kafka # 允许连接的客户端用户名(多个用逗号分隔) - name: KAFKA_CLIENT_PASSWORDS value: kafka123 # 客户端对应密码,顺序与用户名保持一致
🗂 配置 2:Kubernetes 中挂载证书和 JAAS 文件
volumeMounts: - name: kafka-secrets mountPath: /bitnami/kafka/config/certsvolumes: - name: kafka-secrets secret: secretName: kafka-cert-secret - name: jaas-config configMap: name: kafka-jaas-config
你需要将 .jks
文件和密码打包为 Secret:
kubectl create secret generic kafka-cert-secret -n 命名空间 --from-file=kafka.keystore.jks --from-file=kafka.truststore.jks
✅ Kafka 客户端配置(示例)
yaml 配置文件
spring: kafka: bootstrap-servers: 192.168.1.5:9092 # Kafka Broker 的地址和端口 listener: ack-mode: MANUAL_IMMEDIATE # 消费者手动提交消息确认(ack),立即提交(MANUAL_IMMEDIATE),适合需要精确控制 offset 提交时机的业务场景。 consumer: custom-environment: dev # 自定义字段,可用于 profile 配置(非 Spring Kafka 标准字段) auto-offset-reset: latest # 从最新消息开始消费(若无提交 offset) enable-auto-commit: false # 关闭自动提交,需手动调用 ack.acknowledge() # auto-commit-interval: 2000 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 50 # 每次 poll 最多拉取 50 条记录 max-poll-interval-ms: 600000 # 最大处理时间 10 分钟,超时视为挂掉 producer: retries: 0 # 不重试(可调高) batch-size: 16384 # 每批 16KB,达到此大小或 linger.ms 超时才发送 buffer-memory: 33554432 # 缓冲区大小 # String 类型键值序列化 key-serializer: org.apache.kafka.common.serialization.StringSerializer # value-serializer: org.apache.kafka.common.serialization.StringSerializer ssl: # Kafka 客户端验证服务端证书是否可信(客户端信任的 CA 证书放在 truststore 中) # .jks 文件必须放在 resources/ 目录下并打包到 classpath trust-store-location: classpath:kafka.truststore.jks trust-store-password: 123456 properties: sasl: mechanism: PLAIN # 使用 SASL/PLAIN 机制进行身份验证 jaas: # 此处填写 SASL登录时分配的用户名密码(注意password结尾;) # 此处用户名 kafka 和密码 kafka123 必须与服务端 Kafka 设置的 KAFKA_CLIENT_USERS 一致 config: org.apache.kafka.common.security.scram.ScramLoginModule required username=\"kafka\" password=\"kafka123\"; security: protocol: SASL_SSL # 通信使用 SASL 认证 + SSL 加密 ssl: endpoint: identification: algorithm: \"\" # 关闭主机名验证,否则会因 SAN 缺失导致 SSL 握手失败(Java 默认开启) # ssl.endpoint.identification.algorithm= # producer.ssl.endpoint.identification.algorithm= # consumer.ssl.endpoint.identification.algorithm=
生产者
import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProducerSaslSslExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, \"your-kafka-broker:9092\"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 配置SASL认证方式为SASL_SSL props.put(ProducerConfig.SECURITY_PROTOCOL_CONFIG, \"SASL_SSL\"); props.put(ProducerConfig.SASL_MECHANISM, \"PLAIN\"); // 或者其他支持的SASL机制 // 配置Kerberos认证所需的相关参数 props.put(ProducerConfig.SASL_JAAS_CONFIG, \"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\"\\\" password=\\\"\\\";\"); Producer<String, String> producer = new KafkaProducer<>(props); // 生产者使用示例 producer.send(new ProducerRecord<>(\"your-topic\", \"message-key\", \"message-value\"), (metadata, exception) -> { if (exception == null) { System.out.println(\"消息发送成功\"); } else { exception.printStackTrace(); } }); producer.close(); }}
注意
在这个示例中,我们配置了 Kafka 生产者所需的基本参数,并通过ProducerConfig.SECURITY_PROTOCOL_CONFIG
指定了安全协议为 SASL_PLAINTEXT
。然后,我们设置了 SASL_MECHANISM_CONFIG
为 PLAIN
并提供了 JAAS
配置 (SASL_JAAS_CONFIG
),其中包含了用于连接到 Kafka 集群的用户名和密码。
请确保将 ,
, kafka-broker1:9092, kafka-broker2:9092
, 和 your-topic
替换为你的实际用户名、密码、Kafka 代理地址、主题名称。
你的Kafka集群已经配置了SSL和SASL认证,并且相关的安全设置是正确的
消费者
@Configurationpublic class KafkaConfig { @Value(\"${spring.kafka.bootstrap-servers}\") private String bootstrapServers; @Value(\"${spring.kafka.consumer.auto-offset-reset}\") private String autoOffsetReset; @Value(\"${spring.kafka.consumer.enable-auto-commit}\") private String enableAutoCommit; @Value(\"${spring.kafka.consumer.max-poll-records}\") private String maxPollRecords; @Value(\"${spring.kafka.consumer.max-poll-interval-ms}\") private String maxPollIntervalMs; @Bean public ConsumerFactory<String, String> custConsumerConfigFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, \"test\"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs); props.put(\"security.protocol\", \"SASL_SSL\"); props.put(\"sasl.mechanism\", \"PLAIN\"); props.put(\"ssl.endpoint.identification.algorithm\", \"\"); props.put(\"consumer.ssl.endpoint.identification.algorithm\", \"\"); props.put(\"sasl.jaas.config\", \"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\"kafka\\\" password=\\\"kafka123\\\";\"); // SSL配置 props.put(\"ssl.truststore.location\", \"D:\\\\code\\\\ideaprojects\\\\zhubay-test\\\\src\\\\main\\\\resources\\\\kafka.truststore.jks\"); props.put(\"ssl.truststore.password\", \"123456\"); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> daConsumerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(custConsumerConfigFactory()); factory.setBatchListener(true); // 启用批量消费 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; }}
在这个示例中,我们配置了KafkaConsumer
以使用SASL_SSL
协议进行通信,并且指定了SASL
的PLAIN
认证机制。我们还需要指定SSL
的信任库和密钥库的位置以及它们的密码。sasl.jaas.config
属性中应该包含有效的JAAS
配置,它定义了用于认证的用户名和密码。