蚂蚁面试:Kafka 如何做压测?如何保证系统稳定?_kafka压测
Kafka是大数据领域应用非常广泛的消息中间件,如何确定Kafka集群的最大吞吐量和延迟呢?又如何保证Kafka集群的稳定呢?今天我们来介绍Kafka压测方案,来确认Kafka集群的各类指标。
一、Kafka自带性能测试工具
Kafka提供了内置的性能测试工具,可以用于生产者和消费者的基准测试:
·生产者性能测试工具:kafka-producer-perf-test.sh
· 消费者性能测试工具:kafka-consumer-perf-test.sh
第三方压测工具JMeter:
· 可以使用JMeter的Kafka插件进行压测Tsung
· 支持Kafka协议的分布式压测工具Gatling
· 可以通过Kafka插件进行压测
二、压测场景设计
1. 生产者性能测试
测试不同消息大小、批处理设置和压缩算法对生产者性能的影响:
# 测试100字节消息,无压缩 /opt/kafka/bin/kafka-producer-perf-test.sh \\ --topic test-topic \\ --num-records 10000000 \\ --record-size 100 \\ --throughput -1 \\ --producer-props bootstrap.servers=broker1:9092,broker2:9092,broker3:9092 \\ acks=1 \\ batch.size=16384 \\ linger.ms=0 \\ compression.type=none # 测试1KB消息,使用lz4压缩 /opt/kafka/bin/kafka-producer-perf-test.sh \\ --topic test-topic \\ --num-records 10000000 \\ --record-size 1024 \\ --throughput -1 \\ --producer-props bootstrap.servers=broker1:9092,broker2:9092,broker3:9092 \\ acks=1 \\ batch.size=65536 \\ linger.ms=10 \\ compression.type=lz4
2. 消费者性能测试
测试不同消费者组配置和分区数对消费性能的影响:
# 基本消费者性能测试 /opt/kafka/bin/kafka-consumer-perf-test.sh \\ --bootstrap-server broker1:9092,broker2:9092,broker3:9092 \\ --topic test-topic \\ --messages 10000000 \\ --threads 1 \\ --print-metrics # 多线程消费者测试 /opt/kafka/bin/kafka-consumer-perf-test.sh \\ --bootstrap-server broker1:9092,broker2:9092,broker3:9092 \\ --topic test-topic \\ --messages 10000000 \\ --threads 8 \\ --print-metrics
3. 端到端延迟测试
测量从生产到消费的端到端延迟:
# 创建一个具有多个分区的测试主题 /opt/kafka/bin/kafka-topics.sh \\ --bootstrap-server broker1:9092 \\ --create \\ --topic latency-test \\ --partitions 8 \\ --replication-factor 3 # 使用自定义Java程序测量端到端延迟 // 生产者代码示例 Properties props = new Properties(); props.put(\"bootstrap.servers\", \"broker1:9092,broker2:9092,broker3:9092\"); props.put(\"key.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\"); props.put(\"value.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\"); props.put(\"acks\", \"all\"); Producer producer = new KafkaProducer(props); for (int i = 0; i < 10000; i++) { long timestamp = System.currentTimeMillis(); ProducerRecord record = new ProducerRecord(\"latency-test\", null, timestamp, \"key-\" + i, \"value-\" + timestamp); producer.send(record); Thread.sleep(100); // 每秒发送10条消息 } producer.close(); // 消费者代码示例 Properties props = new Properties(); props.put(\"bootstrap.servers\", \"broker1:9092,broker2:9092,broker3:9092\"); props.put(\"group.id\", \"latency-test-group\"); props.put(\"key.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\"); props.put(\"value.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\"); props.put(\"auto.offset.reset\", \"earliest\"); KafkaConsumer consumer = new KafkaConsumer(props); consumer.subscribe(Collections.singletonList(\"latency-test\")); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { long latency = System.currentTimeMillis() - record.timestamp(); System.out.printf(\"Offset = %d, Latency = %d ms%n\", record.offset(), latency); } }
4. 吞吐量与延迟权衡测试
测试不同配置下吞吐量与延迟的权衡关系:
# 高吞吐量配置测试 /opt/kafka/bin/kafka-producer-perf-test.sh \\ --topic throughput-test \\ --num-records 5000000 \\ --record-size 1024 \\ --throughput -1 \\ --producer-props bootstrap.servers=broker1:9092,broker2:9092,broker3:9092 \\ acks=1 \\ batch.size=131072 \\ linger.ms=50 \\ compression.type=lz4 \\ buffer.memory=67108864 # 低延迟配置测试 /opt/kafka/bin/kafka-producer-perf-test.sh \\ --topic latency-test \\ --num-records 1000000 \\ --record-size 1024 \\ --throughput -1 \\ --producer-props bootstrap.servers=broker1:9092,broker2:9092,broker3:9092 \\ acks=1 \\ batch.size=8192 \\ linger.ms=0 \\ compression.type=none
三、压测指标分析
1. 生产者关键指标
吞吐量(Throughput):每秒处理的消息数或字节数
延迟(Latency):消息从发送到确认的时间
CPU使用率:生产者进程的CPU使用情况
内存使用率:生产者进程的内存使用情况
批处理率:每批次的平均消息数
2. 消费者关键指标
吞吐量:每秒消费的消息数或字节数
延迟:消息从生产到消费的时间
消费者滞后(Consumer Lag):消费者落后于生产者的消息数
处理时间:消费者处理每条消息的时间
提交率:偏移量提交的频率和成功率
3. Broker关键指标
请求处理率:每秒处理的请求数
请求队列大小:等待处理的请求数
网络吞吐量:进出Broker的网络流量
磁盘使用率:日志文件的增长速率
GC暂停时间:垃圾收集对性能的影响
四、压测结果解读
1. 生产者性能分析
以下是一个典型的生产者性能测试结果示例:
100000 records sent, 25000.0 records/sec (24.41 MB/sec), 15.2 ms avg latency, 293.0 ms max latency. 200000 records sent, 26315.8 records/sec (25.67 MB/sec), 12.8 ms avg latency, 128.0 ms max latency. 300000 records sent, 27272.7 records/sec (26.61 MB/sec), 11.5 ms avg latency, 98.0 ms max latency.
结果解读:
吞吐量随时间稳定在约26,000条记录/秒(约25MB/秒)
平均延迟约为13毫秒,最大延迟为293毫秒
随着测试进行,延迟趋于稳定,表明系统性能良好
2. 消费者性能分析
以下是一个典型的消费者性能测试结果示例:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec 2023-05-01 10:00:00, 2023-05-01 10:01:00, 1024.00, 17.07, 1048576, 17476.27, 20, 60000, 17.07, 17476.27
结果解读:
消费速率为17.07MB/秒,约17,476条消息/秒
重平衡时间为20毫秒,表明消费者组协调效率高
获取时间为60秒,与测试持续时间一致
3. 瓶颈识别与解决
常见的性能瓶颈及解决方案:
CPU瓶颈:
·增加broker数量
· 优化消息压缩算法
· 调整JVM参数
内存瓶颈:
· 增加堆内存大小
· 优化生产者/消费者客户端缓冲区大小
· 减少不必要的对象创建
磁盘I/O瓶颈:
· 使用更快的存储(如SSD)
· 增加数据目录数量,分散I/O负载
· 优化日志段大小和刷盘策略
网络瓶颈:
· 增加网络带宽
· 优化消息批处理大小
· 使用更高效的压缩算法
感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:
这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!有需要的小伙伴可以点击下方小卡片领取