Docker 安装 kafka (bitnami/kafka:4.0)_docker kafka
1 拉取镜像
docker pull bitnami/kafka:4.0
2 创建挂载目录
mkdir -p /user/lzl/tool/docker/kafka/bitnami/bitnami_kafka_4.0/home/data
mkdir -p /user/lzl/tool/docker/kafka/bitnami/bitnami_kafka_4.0/home/logs
3 给挂载目录授权
chmod 777 /user/lzl/tool/docker/kafka/bitnami/bitnami_kafka_4.0/home/data
chmod 777 /user/lzl/tool/docker/kafka/bitnami/bitnami_kafka_4.0/home/logs
4 运行容器
4.1 运行命令
docker run -d \\ --name bitnami_kafka_4.0 \\ --restart always \\ --ulimit nofile=65536:65536 \\ -e TZ=Asia/Shanghai \\ -e KAFKA_ENABLE_KRAFT=yes \\ -e KAFKA_CFG_NODE_ID=0 \\ -e KAFKA_CFG_PROCESS_ROLES=controller,broker \\ -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@127.0.0.1:9093 \\ -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \\ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://113.45.38.93:9092 \\ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \\ -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \\ -e KAFKA_CFG_LOG_DIRS=/bitnami/kafka/data \\ -p 9092:9092 \\ -v /user/lzl/tool/docker/kafka/bitnami/bitnami_kafka_4.0/home/data:/bitnami/kafka \\ -v /user/lzl/tool/docker/kafka/bitnami/bitnami_kafka_4.0/home/logs:/opt/bitnami/kafka/logs \\ --memory=512m \\ --cpus=\"1.0\" \\ bitnami/kafka:4.0
4.2 命令解释
下面先给出这条 docker run
命令的整体概览,然后逐行详解各个参数和环境变量的作用。
这条命令启动了一个基于 Bitnami 提供的 Kafka 4.0 镜像的容器,并在 KRaft 模式(即不依赖 Zookeeper)下同时担任 controller 和 broker。它设置了时区、节点 ID、监听器及广告地址,指定数据和日志存储目录,限制了文件句柄数、内存和 CPU 使用,并在主机上映射了对应端口,保证容器重启策略为“始终重启”。
4.2.1 基本启动与命名
docker run -d \\ --name bitnami_kafka_4.0 \\ --restart always \\
docker run -d
:以“后台模式”(detached)启动容器,使其在后台运行,不占用当前终端。--name bitnami_kafka_4.0
:为容器指定名字,方便后续管理和运维。--restart always
:容器退出(无论退出码为何)或 Docker 守护进程重启后,都会自动重启该容器,保证 Kafka 服务的高可用性。
4.2.2 文件句柄限制
--ulimit nofile=65536:65536 \\
--ulimit nofile=65536:65536
:将容器内“最大打开文件数”(nofile
)软限制和硬限制都设置为 65536,避免 Kafka 在高并发情况下因文件描述符不足而崩溃。
4.2.3 时区设置
-e TZ=Asia/Shanghai \\
-e TZ=Asia/Shanghai
:设置容器时区为北京时间(东八区),使 Kafka 日志及监控时间戳与本地时间保持一致,便于分析与排查。
4.2.4 KRaft 模式开启及节点角色
-e KAFKA_ENABLE_KRAFT=yes \\ -e KAFKA_CFG_NODE_ID=0 \\ -e KAFKA_CFG_PROCESS_ROLES=controller,broker \\ -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@127.0.0.1:9093 \\
KAFKA_ENABLE_KRAFT=yes
:启用 KRaft 模式(Kafka Raft Metadata),这是 Kafka 未来推荐的无 Zookeeper 架构。KAFKA_CFG_NODE_ID=0
:为该 Kafka 实例分配唯一的节点 ID,KRaft 模式下用于集群内部元数据选举。KAFKA_CFG_PROCESS_ROLES=controller,broker
:指定该节点同时承担 “controller” 和 “broker” 两种角色。KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@127.0.0.1:9093
:定义控制器选举的投票列表,格式为@:
,此处只有一个节点自身(单节点集群)。
4.2.5 监听器配置
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \\ -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://IP:9092 \\ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \\ -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \\
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
:定义两个监听端口:PLAINTEXT
:对外提供普通客户端连接,监听容器内部9092
端口;CONTROLLER
:内部控制器通信,监听容器内部9093
端口。
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://IP:9092
:告诉客户端连接时使用的地址和端口,通常设置为宿主机或对外 IP,方便外部服务或用户连接。KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
:将每个监听器映射到安全协议,此处均为明文(PLAINTEXT)。KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
:指定用于 controller 通信的监听器名称为CONTROLLER
。
4.2.6 日志与数据目录挂载
-e KAFKA_CFG_LOG_DIRS=/bitnami/kafka/data \\ -p 9092:9092 \\ -v /user/lzl/tool/docker/kafka/bitnami/bitnami_kafka_4.0/home/data:/bitnami/kafka \\ -v /user/lzl/tool/docker/kafka/bitnami/bitnami_kafka_4.0/home/logs:/opt/bitnami/kafka/logs \\
KAFKA_CFG_LOG_DIRS=/bitnami/kafka/data
:Kafka 存储分区数据的目录。-p 9092:9092
:将宿主机的9092
端口映射到容器的9092
,使外部客户端可以访问。-v …/data:/bitnami/kafka
:把宿主机的数据目录挂载到容器中,用于持久化 Kafka topic 数据。-v …/logs:/opt/bitnami/kafka/logs
:把宿主机的日志目录挂载到容器,用于持久化 Kafka 日志文件,便于排查。
4.2.7 资源限制
--memory=512m \\ --cpus=\"1.0\" \\
--memory=512m
:限制容器使用最多 512MB 内存,避免单容器占用过多宿主机资源。--cpus=\"1.0\"
:限制容器最多使用一个 CPU 核心,保障宿主机上其他服务的性能。
4.2.8 镜像与版本
bitnami/kafka:4.0
- 使用 Bitnami 官方维护的
bitnami/kafka:4.0
镜像。Bitnami 镜像通常附带最佳实践的配置和管理脚本,方便快速部署生产级 Kafka 服务。
5 SpringBoot 整合
5.1 引入依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring.kafka.version}</version> </dependency>
5.2 一次拉取一条消息
5.2.1 yml 配置
spring: kafka: bootstrap-servers: IP:9092 listener: ack-mode: manual_immediate consumer: group-id: kafka_consumer_group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # earliest 启动时,如果找不到有效的偏移量(如消费组第一次启动或偏移量已过期),从 Topic 的最早消息开始消费 # latest(默认值) 启动时,如果找不到有效的偏移量,从 Topic 的最新消息开始消费(跳过历史消息) # none 如果没有有效偏移量,则抛出异常 auto-offset-reset: earliest enable-auto-commit: false producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer # 发送消息失败允许重试次数 retries: 5
5.2.2 使用 kafka 发送消息
import org.springframework.kafka.core.KafkaTemplate;/** * 消息发送 */@Slf4j@Componentpublic class EventPublisher { @Resource private KafkaTemplate<String, String> kafkaTemplate; public void publish(String topic, BaseEvent.EventMessage<?> eventMessage) { String messageJson = JSONUtil.toJsonStr(eventMessage); publish(topic, messageJson); } public void publish(String topic, String eventMessageJSON) { try { kafkaTemplate.send(topic, eventMessageJSON); log.info(\"发送MQ消息 topic:{} message:{}\", topic, eventMessageJSON); } catch (Exception e) { log.error(\"发送MQ消息失败 topic:{} message:{}\", topic, eventMessageJSON, e); throw e; } }}
5.2.3 使用 kafka 消费消息
@KafkaListener(topics = {topic}, groupId = \"${spring.kafka.consumer.group-id}\") public void listener(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) { String message = record.value(); try { log.info(\"监听发送消息,topic: {} message: {}\", topic, message); // 手动确认消费成功 acknowledgment.acknowledge(); } catch (Exception e) { log.error(\"监听发送消息,消费失败 topic: {} message: {}\", topic, message); } }
5.3 批量拉取消息
5.3.1 yml 配置
增加 spring.kafka.consumer.max-poll-records
,提高 kafka 消费者批量拉去消息数量;
增加 spring.kafka.consumer.fetch-min-size
(Broker 至少准备多少字节数据才返回给 Consumer) 和 spring.kafka.consumer.fetch-max-wait
(如果数据没达到 fetch.min.bytes,最多等待多长时间才返回),提高 kafka 消费者批量拉取效率;
spring: kafka: bootstrap-servers: IP:9092 listener: ack-mode: manual_immediate consumer: group-id: kafka_consumer_group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # earliest 启动时,如果找不到有效的偏移量(如消费组第一次启动或偏移量已过期),从 Topic 的最早消息开始消费 # latest(默认值) 启动时,如果找不到有效的偏移量,从 Topic 的最新消息开始消费(跳过历史消息) # none 如果没有有效偏移量,则抛出异常 auto-offset-reset: earliest enable-auto-commit: false # 控制每次 poll() 从 Kafka 中拉取的最大消息数 max-poll-records: 5 # 拉取最少1MB数据 fetch-min-size: 1048576 # 最多等100ms fetch-max-wait: 100 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer # 发送消息失败允许重试次数 retries: 5
5.3.2 配置批处理容器工厂
package com.scheme.kafka.factory;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.listener.ContainerProperties;@Configurationpublic class KafkaConsumerConfig { @Bean(\"batchFactory\") public ConcurrentKafkaListenerContainerFactory<String, String> batchFactory( ConsumerFactory<String, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setBatchListener(true); // 开启批量消费 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); // 手动 ack(可选) return factory; }}
5.3.3 使用 kafka 发送消息
package com.scheme.kafka.producer;import jakarta.annotation.PostConstruct;import jakarta.annotation.Resource;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;@Componentpublic class TestProducer { @Resource private KafkaTemplate<String, String> kafkaTemplate; @PostConstruct public void init() { for(int i = 0; i < 10; i++) { kafkaTemplate.send(\"test_topic\", String.format(\"testMessage%s\", i)); } }}
5.3.4 使用 kafka 消费消息
package com.scheme.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.boot.autoconfigure.jms.AcknowledgeMode;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.support.Acknowledgment;import org.springframework.stereotype.Component;import java.util.List;@Componentpublic class TestConsumer { @KafkaListener(topics = {\"test_topic\"}, containerFactory = \"batchFactory\") public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgmente) { System.out.println(records); for (ConsumerRecord<String, String> record : records) { String value = record.value(); System.out.println(\"================================================================================\"); System.out.println(value); System.out.println(\"================================================================================\"); } acknowledgmente.acknowledge(); }}
5.3.5 containerFactory
参数介绍
containerFactory = \"batchFactory\"
是 Spring Kafka 中 @KafkaListener
注解的参数,用于指定KafkaListenerContainerFactory 的 Bean 名称,决定当前监听器的消息消费方式和行为策略。
参数说明
@KafkaListener( topics = \"my-topic\", containerFactory = \"batchFactory\")
containerFactory
KafkaListenerContainerFactory
Bean)来创建消费者容器。常见容器工厂区别
\"batchFactory\"
List<ConsumerRecord>
。6 参考
6.1 配置文件地址
/opt/bitnami/kafka/config/