> 技术文档 > Kafka 安装、使用与原理深度解析_kafka安装

Kafka 安装、使用与原理深度解析_kafka安装


Kafka 安装、使用与原理深度解析

  • 一、Kafka 核心概念与架构
    • 1:Kafka 简介
  • 二、Kafka 核心概念与架构
    • 1:Kafka 核心组件
    • 2:核心概念解析
    • 3:核心原理
    • 4:生产者机制
    • 5:消费者
    • 6:副本同步
  • 三、Kafka 安装部署(含图解)
    • 1:环境准备
      • 1.1:Java 安装验证
      • 1.2:Java 安装验证
      • 1.2:下载 Kafka
    • 2:单机部署
      • 2.1:启动 ZooKeeper
      • 2.2:启动 Kafka Broker
      • 2.3:创建主题(Topic)
  • 四、基础操作实战
    • 1:生产消息
    • 2:消费消息
    • 3:查看消费者组
    • 4:查看主题详细信息
  • 五、Kafka 集群配置
    • 1. 配置多节点集群
      • 1.1 复制 server.properties 文件
      • 1.2 修改配置文件
      • 1.3 启动集群节点
    • 2. 创建复制因子为3的主题
  • 六、Maven 项目集成
    • 1. 创建 Maven 项目-简单实现
      • 1.1添加 Maven 依赖:
      • 1.2 生产者代码:
      • 1.3 消费者代码:
    • 2. 创建 Maven 项目-实用场景
      • 2.1 依赖配置优化
      • 2.2 线程安全的生产者封装
      • 2.3 Spring Boot自动配置
      • 2.3 批量消费模式实现
      • 2.4 死信队列(DLQ)处理
      • 2.5 Micrometer指标集成
      • 2.6 关键监控指标
  • 七、Kafka 管理工具
    • 1. Kafka Manager
    • 2. Kafdrop
  • 八、Kafka 性能优化
    • 1. 生产者优化
    • 2. 消费者优化
    • 3. Broker 优化
  • 九、Kafka 安全配置
    • 1. SSL 加密
      • 1.1 生成 SSL 证书
      • 1.2 配置 server.properties:
    • 2. SASL 认证
  • 十、Kafka 监控
    • 1. JMX 监控
    • 2. Prometheus + Grafana
  • 十一、常见问题解决
    • 1. 启动时报错 \"Address already in use\"
    • 2. 消费者无法消费消息
    • 3. 磁盘空间不足
    • 4. 消息顺序保证
    • 5. 消息幂等处理
  • 十一、模拟场景
    • 1. 订单状态变更事件
      • 2. 用户行为追踪

一、Kafka 核心概念与架构

1:Kafka 简介

Apache Kafka 是一个分布式流处理平台,具有以下特点:

  • 高吞吐量: 可以处理每秒数百万条消息
  • 可扩展性: 可以水平扩展,支持集群部署
  • 持久性: 消息持久化到磁盘,支持数据备份
  • 容错性: 节点故障时自动转移

Kafka 主要应用于:

  • 实时数据管道
  • 实时流处理
  • 日志收集与分析
  • 事件源架构

二、Kafka 核心概念与架构

1:Kafka 核心组件

架构图说明:

  • Producer: 消息生产者,向 Kafka 集群推送数据
  • Broker: 服务节点,负责消息存储和转发
  • Consumer: 消费者,从集群拉取数据
  • ZooKeeper: 管理集群元数据和控制器选举(Kafka 3.0+ 开始支持 KRaft 模式无需 ZooKeeper)

2:核心概念解析

  • Topic: 消息分类的逻辑容器
  • Partition: 物理分片,每个 Topic 分为多个 Partition
  • Offset: 消息在 Partition 中的唯一序号
  • Replica: Partition 的副本,保证数据高可用

3:核心原理

  • 分段存储: 每个 Partition 划分为多个 Segment
  • 索引文件: index 和 .timeindex 实现快速定位
  • 顺序写入: 仅追加(Append-only)写模式保证高吞吐

4:生产者机制

sequenceDiagram participant Producer participant Broker Producer->>Broker: 1. 发送消息 Broker-->>Producer: 2. 返回ACK Producer->>Broker: 3. 批量发送 Note right of Broker: 使用PageCache加速写入

5:消费者组

  • 再平衡(Rebalance): 消费者增减时自动重新分配分区
  • Offset 提交: 定期提交消费进度到 __consumer_offsets Topic

6:副本同步

graph LR Leader[Leader Partition] --> Follower1[Follower 1] Leader --> Follower2[Follower 2] Follower1 -- Sync --> Leader Follower2 -- Sync --> Leader
  • ISR 集合: 保持同步的副本列表
  • HW(High Watermark): 已成功复制的消息边界

三、Kafka 安装部署(含图解)

1:环境准备

Kafka 需要以下运行环境:

  • Java 8 或更高版本
  • ZooKeeper(Kafka 2.8.0+ 开始支持不需要 ZooKeeper 的模式)

1.1:Java 安装验证

# Ubuntu/Debiansudo apt updatesudo apt install openjdk-11-jdk# CentOS/RHELsudo yum install java-11-openjdk-devel

1.2:Java 安装验证

# 终端执行java -version# 预期输出openjdk version \"11.0.20\" 2023-07-18

1.2:下载 Kafka

地址:https://archive.apache.org/dist/kafka

wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgztar -xzf kafka_2.13-3.6.1.tgz

2:单机部署

2.1:启动 ZooKeeper

Kafka 使用 ZooKeeper 来管理集群元数据。Kafka 包中自带了一个简单的 ZooKeeper 实例:

# 启动 ZooKeeper (使用内置的)bin/zookeeper-server-start.sh config/zookeeper.properties

2.2:启动 Kafka Broker

打开新的终端窗口,启动 Kafka 服务:

bin/kafka-server-start.sh config/server.properties

2.3:创建主题(Topic)

bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

查看已创建的主题:

bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

四、基础操作实战

1:生产消息

启动生产者控制台:

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

然后可以输入一些消息,按回车发送。

2:消费消息

启动消费者控制台:

bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

3:查看消费者组

启动消费者控制台:

bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092

4:查看主题详细信息

启动消费者控制台:

bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

五、Kafka 集群配置

1. 配置多节点集群

1.1 复制 server.properties 文件

cp config/server.properties config/server-1.propertiescp config/server.properties config/server-2.properties

1.2 修改配置文件

# server-1.propertiesbroker.id=1listeners=PLAINTEXT://:9093log.dirs=/tmp/kafka-logs-1# server-2.propertiesbroker.id=2listeners=PLAINTEXT://:9094log.dirs=/tmp/kafka-logs-2

1.3 启动集群节点

bin/kafka-server-start.sh config/server-1.properties &bin/kafka-server-start.sh config/server-2.properties &

2. 创建复制因子为3的主题

bin/kafka-topics.sh --create --topic replicated-topic --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1

六、Maven 项目集成

1. 创建 Maven 项目-简单实现

1.1添加 Maven 依赖:

<!-- pom.xml --><dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.6.1</version></dependency>

1.2 生产者代码:

public class AdvancedProducer { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, \"localhost:9092\"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); try (Producer<String, String> producer = new KafkaProducer<>(props)) { for (int i = 0; i < 100; i++) { ProducerRecord<String, String> record =  new ProducerRecord<>(\"test-topic\", \"key-\" + i, \"value-\" + i); producer.send(record, (metadata, e) -> {  if (e == null) { System.out.printf(\"Sent to partition %d offset %d%n\", metadata.partition(), metadata.offset());  } }); } } }}

1.3 消费者代码:

public class AdvancedConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, \"localhost:9092\"); props.put(ConsumerConfig.GROUP_ID_CONFIG, \"test-group\"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Collections.singletonList(\"test-topic\")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record : records) {  System.out.printf(\"[%s] key=%s, value=%s%n\", Thread.currentThread().getName(), record.key(), record.value()); } } } }}

2. 创建 Maven 项目-实用场景

2.1 依赖配置优化

<!-- 推荐使用Confluent的依赖管理 --><dependencyManagement> <dependencies> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-connect-bom</artifactId> <version>7.6.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies></dependencyManagement><dependencies> <!-- 核心依赖 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.6.1</version> </dependency> <!-- 生产环境推荐添加 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>2.0.12</version> </dependency> <dependency> <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-retry</artifactId> <version>2.2.0</version> </dependency></dependencies>

2.2 线程安全的生产者封装

@Component@Slf4jpublic class KafkaProducerService { private final Producer<String, String> producer; private final Retry retry; public KafkaProducerService(@Value(\"${kafka.bootstrap.servers}\") String bootstrapServers) { // 生产者配置 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.ACKS_CONFIG, \"all\"); // 最强一致性保证 props.put(ProducerConfig.RETRIES_CONFIG, 3); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性 props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, \"lz4\"); this.producer = new KafkaProducer<>(props); // 配置重试机制 this.retry = Retry.of(\"kafkaProducer\", RetryConfig.custom() .maxAttempts(3) .waitDuration(Duration.ofMillis(500)) .retryOnException(e -> !(e instanceof UnrecoverableException)) .build()); } @PreDestroy public void close() { producer.close(Duration.ofSeconds(30)); } public CompletableFuture<RecordMetadata> sendAsync(String topic, String key, String value) { ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); return CompletableFuture.supplyAsync(() -> Retry.decorateCheckedSupplier(retry, () -> { try { return producer.send(record).get(); } catch (ExecutionException e) { throw e.getCause(); } }).unchecked().get()).exceptionally(ex -> { log.error(\"Failed to send message to topic {}\", topic, ex); return null; }); }}

2.3 Spring Boot自动配置

@Configuration@EnableConfigurationProperties(KafkaProperties.class)public class KafkaAutoConfiguration { @Bean @Primary public ProducerFactory<String, Object> producerFactory(KafkaProperties properties) { Map<String, Object> configs = properties.buildProducerProperties(); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configs); } @Bean public KafkaTemplate<String, Object> kafkaTemplate( ProducerFactory<String, Object> producerFactory) { KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory); template.setProducerListener(new LoggingProducerListener<>()); return template; }}

2.3 批量消费模式实现

@Slf4j@Componentpublic class BatchMessageConsumer { @KafkaListener( topics = \"${kafka.topic.order-events}\", containerFactory = \"batchListenerContainerFactory\") public void consume(List<ConsumerRecord<String, OrderEvent>> records,Acknowledgment ack) { try { List<OrderEvent> orders = records.stream() .map(ConsumerRecord::value) .collect(Collectors.toList()); // 批量处理业务逻辑 orderService.processBatch(orders); // 手动提交offset ack.acknowledge(); } catch (Exception e) { log.error(\"Batch processing failed\", e); // 根据异常类型决定是否重试 if (e instanceof RecoverableException) { throw e; // 会触发重试 } // 其他异常直接跳过 ack.acknowledge(); } }}// 配置类@Configuration@EnableKafkapublic class KafkaConsumerConfig { @Bean public ConcurrentKafkaListenerContainerFactory<String, String> batchListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); factory.setBatchListener(true); // 开启批量模式 factory.getContainerProperties().setAckMode(AckMode.MANUAL); // 手动提交 factory.setConcurrency(3); // 并发消费者数量 factory.getContainerProperties().setPollTimeout(3000); return factory; }}

2.4 死信队列(DLQ)处理

@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); // 配置死信处理 factory.setCommonErrorHandler(new DefaultErrorHandler( new DeadLetterPublishingRecoverer(kafkaTemplate, (record, ex) -> new TopicPartition(\"error.\" + record.topic(), record.partition())), new FixedBackOff(1000L, 3L))); // 重试3次,间隔1秒 return factory;}

2.5 Micrometer指标集成

@Beanpublic KafkaClientMetrics kafkaClientMetrics(ProducerFactory<?, ?> producerFactory) { return new KafkaClientMetrics((Producer<?, ?>) producerFactory.createProducer());}@Beanpublic MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() { return registry -> registry.config().commonTags( \"application\", \"order-service\", \"kafka.version\", \"3.6.1\");}

2.6 关键监控指标

# 生产者指标kafka_producer_record_send_ratekafka_producer_record_error_ratekafka_producer_request_latency_avg# 消费者指标kafka_consumer_records_consumed_ratekafka_consumer_records_lagkafka_consumer_fetch_latency_avg

七、Kafka 管理工具

1. Kafka Manager

Kafka Manager 是 Yahoo 开源的 Kafka 集群管理工具:

git clone https://github.com/yahoo/kafka-manager.gitcd kafka-manager./sbt clean dist

2. Kafdrop

Kafdrop 是另一个流行的 Kafka Web UI:

docker run -d --rm -p 9000:9000 \\ -e KAFKA_BROKERCONNECT=localhost:9092 \\ -e JVM_OPTS=\"-Xms32M -Xmx64M\" \\ -e SERVER_SERVLET_CONTEXTPATH=\"/\" \\ obsidiandynamics/kafdrop

八、Kafka 性能优化

1. 生产者优化

# 确保所有副本都收到消息(all,0,1)acks=all# 失败重试次数retries=5enable.idempotence=true# 批量发送大小 (bytes)batch.size=16384# 发送缓冲区大小 (bytes)buffer.memory=33554432# 发送前等待时间 (ms)linger.ms=5# 压缩类型 (none, gzip, snappy, lz4, zstd)compression.type=snappy

2. 消费者优化

# 无offset时(从最新开始:latest,earliest)auto.offset.reset=earliestenable.auto.commit=false# 最小抓取字节数fetch.min.bytes=1# 抓取等待最大时间fetch.max.wait.ms=500# 每次poll最大记录数max.poll.records=500# 消费者处理逻辑最大时间 (ms)max.poll.interval.ms=300000# 自动提交间隔 (ms)auto.commit.interval.ms=5000

3. Broker 优化

# 日志保留时间 (hours)log.retention.hours=168# 日志段大小 (bytes)log.segment.bytes=1073741824# 副本同步数量min.insync.replicas=2# 消息最大字节数message.max.bytes=1000012

九、Kafka 安全配置

1. SSL 加密

1.1 生成 SSL 证书

1.2 配置 server.properties:

listeners=SSL://:9093ssl.keystore.location=/path/to/kafka.server.keystore.jksssl.keystore.password=keystore_passwordssl.key.password=key_passwordssl.truststore.location=/path/to/kafka.server.truststore.jksssl.truststore.password=truststore_passwordssl.client.auth=required

2. SASL 认证

配置 JAAS 文件并修改 server.properties:

listeners=SASL_SSL://:9093security.inter.broker.protocol=SASL_SSLsasl.mechanism.inter.broker.protocol=PLAINsasl.enabled.mechanisms=PLAIN

十、Kafka 监控

1. JMX 监控

启动 Kafka 时启用 JMX:

JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties

2. Prometheus + Grafana

使用 Kafka Exporter 将指标导出到 Prometheus:

docker run -d --name kafka-exporter -p 9308:9308 \\ -e KAFKA_BROKERS=\"kafka1:9092,kafka2:9092\" \\ danielqsj/kafka-exporter

十一、常见问题解决

1. 启动时报错 “Address already in use”

检查端口是否被占用,或修改 Kafka 配置中的端口号:

netstat -tulnp | grep 9092

2. 消费者无法消费消息

排查步骤:

  1. 检查消费者组偏移量:
bin/kafka-consumer-groups.sh --describe --group test-group --bootstrap-server localhost:9092
  1. 验证网络连通性
  2. 查看消费者日志
  3. 检查是否触发再平衡

3. 磁盘空间不足

调整日志保留策略:

log.retention.hours=48log.retention.bytes=10737418240

4. 消息顺序保证

// 使用单分区或key-based分区保证顺序public void sendSequentialMessage(String sequenceId, String message) { // 相同sequenceId的消息会路由到同一分区 producer.send(new ProducerRecord<>(\"sequential-topic\", sequenceId, message));}// 消费者配置@Beanpublic ConsumerFactory<String, String> sequentialConsumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); // 每次只处理1条 props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1024 * 1024); // 限制fetch大小 return new DefaultKafkaConsumerFactory<>(props);}

5. 消息幂等处理

// 消费者端幂等检查@KafkaListener(topics = \"payment-events\")public void handlePayment(PaymentEvent event, @Header(KafkaHeaders.RECEIVED_KEY) String key) { if (paymentService.isProcessed(key)) { log.warn(\"Duplicate payment event detected: {}\", key); return; } paymentService.process(event);}// 或使用外部存储实现幂等public void processWithIdempotence(ConsumerRecord<String, String> record) { String idempotenceKey = record.key() + \"_\" + record.offset(); if (!redisTemplate.opsForValue().setIfAbsent(idempotenceKey, \"1\", 24, TimeUnit.HOURS)) { log.info(\"Duplicate message skipped: {}\", idempotenceKey); return; } // 处理业务逻辑}

十一、模拟场景

1. 订单状态变更事件

// 事件定义public class OrderEvent { private String orderId; private OrderStatus oldStatus; private OrderStatus newStatus; private Long timestamp; // getters/setters}// 生产者public void publishOrderEvent(Order order, OrderStatus newStatus) { OrderEvent event = new OrderEvent(); event.setOrderId(order.getId()); event.setOldStatus(order.getStatus()); event.setNewStatus(newStatus); event.setTimestamp(System.currentTimeMillis()); kafkaTemplate.send(\"order-events\", order.getId(), event) .addCallback( success -> log.info(\"Order event published: {}\", event), failure -> log.error(\"Failed to publish order event\", failure) );}// 消费者@KafkaListener(topics = \"order-events\")public void handleOrderEvent(OrderEvent event) { auditService.recordStatusChange(event); if (event.getNewStatus() == OrderStatus.SHIPPED) { inventoryService.updateStock(event.getOrderId()); }}

2. 用户行为追踪

// 异步处理用户行为日志@Async(\"kafkaThreadPool\")public void trackUserAction(UserAction action) { try { String json = objectMapper.writeValueAsString(action); producer.send(new ProducerRecord<>(\"user-actions\", action.getUserId(), json), (metadata, ex) -> { if (ex != null) { // 失败时写入本地文件,后续补偿 log.error(\"Failed to send user action\", ex); fallbackWriter.write(action); } }); } catch (JsonProcessingException e) { log.error(\"Serialization error\", e); }}// 配置专用线程池@Bean(\"kafkaThreadPool\")public Executor kafkaThreadPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); executor.setQueueCapacity(1000); executor.setThreadNamePrefix(\"kafka-producer-\"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor;}