> 技术文档 > 蚂蚁面试:Kafka 如何做压测?如何保证系统稳定?_kafka压测

蚂蚁面试:Kafka 如何做压测?如何保证系统稳定?_kafka压测

Kafka是大数据领域应用非常广泛的消息中间件,如何确定Kafka集群的最大吞吐量和延迟呢?又如何保证Kafka集群的稳定呢?今天我们来介绍Kafka压测方案,来确认Kafka集群的各类指标。

  一、Kafka自带性能测试工具

  Kafka提供了内置的性能测试工具,可以用于生产者消费者的基准测试:

  ·生产者性能测试工具:kafka-producer-perf-test.sh

  · 消费者性能测试工具:kafka-consumer-perf-test.sh

  第三方压测工具JMeter:

  · 可以使用JMeter的Kafka插件进行压测Tsung

  · 支持Kafka协议的分布式压测工具Gatling

  · 可以通过Kafka插件进行压测

  二、压测场景设计

  1. 生产者性能测试

  测试不同消息大小、批处理设置和压缩算法对生产者性能的影响:

# 测试100字节消息,无压缩   /opt/kafka/bin/kafka-producer-perf-test.sh \\    --topic test-topic \\    --num-records 10000000 \\    --record-size 100 \\    --throughput -1 \\    --producer-props bootstrap.servers=broker1:9092,broker2:9092,broker3:9092 \\    acks=1 \\    batch.size=16384 \\    linger.ms=0 \\    compression.type=none   # 测试1KB消息,使用lz4压缩   /opt/kafka/bin/kafka-producer-perf-test.sh \\    --topic test-topic \\    --num-records 10000000 \\    --record-size 1024 \\    --throughput -1 \\    --producer-props bootstrap.servers=broker1:9092,broker2:9092,broker3:9092 \\    acks=1 \\    batch.size=65536 \\    linger.ms=10 \\    compression.type=lz4

 

 2. 消费者性能测试

  测试不同消费者组配置和分区数对消费性能的影响:

 # 基本消费者性能测试   /opt/kafka/bin/kafka-consumer-perf-test.sh \\    --bootstrap-server broker1:9092,broker2:9092,broker3:9092 \\    --topic test-topic \\    --messages 10000000 \\    --threads 1 \\    --print-metrics   # 多线程消费者测试   /opt/kafka/bin/kafka-consumer-perf-test.sh \\    --bootstrap-server broker1:9092,broker2:9092,broker3:9092 \\    --topic test-topic \\    --messages 10000000 \\    --threads 8 \\    --print-metrics

 

3. 端到端延迟测试

  测量从生产到消费的端到端延迟:

# 创建一个具有多个分区的测试主题   /opt/kafka/bin/kafka-topics.sh \\    --bootstrap-server broker1:9092 \\    --create \\    --topic latency-test \\    --partitions 8 \\    --replication-factor 3   # 使用自定义Java程序测量端到端延迟  // 生产者代码示例   Properties props = new Properties();   props.put(\"bootstrap.servers\", \"broker1:9092,broker2:9092,broker3:9092\");   props.put(\"key.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\");   props.put(\"value.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\");   props.put(\"acks\", \"all\");   Producer producer = new KafkaProducer(props);   for (int i = 0; i < 10000; i++) {    long timestamp = System.currentTimeMillis();    ProducerRecord record =    new ProducerRecord(\"latency-test\", null, timestamp, \"key-\" + i, \"value-\" + timestamp);    producer.send(record);    Thread.sleep(100); // 每秒发送10条消息   }   producer.close();   // 消费者代码示例   Properties props = new Properties();   props.put(\"bootstrap.servers\", \"broker1:9092,broker2:9092,broker3:9092\");   props.put(\"group.id\", \"latency-test-group\");   props.put(\"key.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\");   props.put(\"value.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\");   props.put(\"auto.offset.reset\", \"earliest\");   KafkaConsumer consumer = new KafkaConsumer(props);   consumer.subscribe(Collections.singletonList(\"latency-test\"));   while (true) {    ConsumerRecords records = consumer.poll(Duration.ofMillis(100));    for (ConsumerRecord record : records) {    long latency = System.currentTimeMillis() - record.timestamp();    System.out.printf(\"Offset = %d, Latency = %d ms%n\", record.offset(), latency);    }   }

4. 吞吐量与延迟权衡测试

  测试不同配置下吞吐量与延迟的权衡关系:

# 高吞吐量配置测试   /opt/kafka/bin/kafka-producer-perf-test.sh \\    --topic throughput-test \\    --num-records 5000000 \\    --record-size 1024 \\    --throughput -1 \\    --producer-props bootstrap.servers=broker1:9092,broker2:9092,broker3:9092 \\    acks=1 \\    batch.size=131072 \\    linger.ms=50 \\    compression.type=lz4 \\    buffer.memory=67108864   # 低延迟配置测试   /opt/kafka/bin/kafka-producer-perf-test.sh \\    --topic latency-test \\    --num-records 1000000 \\    --record-size 1024 \\    --throughput -1 \\    --producer-props bootstrap.servers=broker1:9092,broker2:9092,broker3:9092 \\    acks=1 \\    batch.size=8192 \\    linger.ms=0 \\    compression.type=none

三、压测指标分析

  1. 生产者关键指标

  吞吐量(Throughput):每秒处理的消息数或字节数

  延迟(Latency):消息从发送到确认的时间

  CPU使用率:生产者进程的CPU使用情况

  内存使用率:生产者进程的内存使用情况

  批处理率:每批次的平均消息数

  2. 消费者关键指标

  吞吐量:每秒消费的消息数或字节数

  延迟:消息从生产到消费的时间

  消费者滞后(Consumer Lag):消费者落后于生产者的消息数

  处理时间:消费者处理每条消息的时间

  提交率:偏移量提交的频率和成功率

  3. Broker关键指标

  请求处理率:每秒处理的请求数

  请求队列大小:等待处理的请求数

  网络吞吐量:进出Broker的网络流量

  磁盘使用率:日志文件的增长速率

  GC暂停时间:垃圾收集对性能的影响

  四、压测结果解读

  1. 生产者性能分析

  以下是一个典型的生产者性能测试结果示例:

  100000 records sent, 25000.0 records/sec (24.41 MB/sec), 15.2 ms avg latency, 293.0 ms max latency.   200000 records sent, 26315.8 records/sec (25.67 MB/sec), 12.8 ms avg latency, 128.0 ms max latency.   300000 records sent, 27272.7 records/sec (26.61 MB/sec), 11.5 ms avg latency, 98.0 ms max latency.

结果解读:

  吞吐量随时间稳定在约26,000条记录/秒(约25MB/秒)

  平均延迟约为13毫秒,最大延迟为293毫秒

  随着测试进行,延迟趋于稳定,表明系统性能良好

  2. 消费者性能分析

  以下是一个典型的消费者性能测试结果示例:

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec   2023-05-01 10:00:00, 2023-05-01 10:01:00, 1024.00, 17.07, 1048576, 17476.27, 20, 60000, 17.07, 17476.27

 

结果解读:

  消费速率为17.07MB/秒,约17,476条消息/秒

  重平衡时间为20毫秒,表明消费者组协调效率高

  获取时间为60秒,与测试持续时间一致

  3. 瓶颈识别与解决

  常见的性能瓶颈及解决方案:

  CPU瓶颈:

  ·增加broker数量

  · 优化消息压缩算法

  · 调整JVM参数

  内存瓶颈:

  · 增加堆内存大小

  · 优化生产者/消费者客户端缓冲区大小

  · 减少不必要的对象创建

  磁盘I/O瓶颈:

  · 使用更快的存储(如SSD)

  · 增加数据目录数量,分散I/O负载

  · 优化日志段大小和刷盘策略

  网络瓶颈:

  · 增加网络带宽

  · 优化消息批处理大小

  · 使用更高效的压缩算法

 

感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!有需要的小伙伴可以点击下方小卡片领取