> 技术文档 > kafka4.0新版本体验

kafka4.0新版本体验


Kafka 4.0 与 Kafka 2.x 对比:架构演进与功能升级

Apache Kafka 自发布以来,凭借其高吞吐、持久化、分布式等特性,已成为大数据生态系统中实时流处理的核心组件。随着版本的不断演进,Kafka 在性能、可维护性和易用性方面持续优化。尤其是从 Kafka 3.x 开始,社区逐步推进去 ZooKeeper 化(KRaft 模式),并在 Kafka 4.0 中进一步完善了这一架构变革。

1. 架构对比(ZooKeeper vs KRaft)

在 Kafka 2.x 版本中,Kafka 强烈依赖 ZooKeeper 来管理元数据和集群协调。然而,随着集群规模的增长,这种依赖性带来了性能瓶颈和运维复杂度。自 Kafka 3.0 开始引入了 KRaft 模式,并在 Kafka 4.0 中进一步完善,旨在彻底移除对 ZooKeeper 的依赖。

  • Kafka 2.x: 使用 ZooKeeper 存储集群状态、配置和分区信息。这导致额外的系统开销,并且需要维护 ZooKeeper 集群。
  • Kafka 4.0: 引入了基于 Raft 共识算法的 KRaft 控制器,将元数据管理和存储直接集成到 Kafka 内部,减少了外部依赖,提升了系统的可扩展性和稳定性。

2. 核心功能增强(Controller Quorum、Metadata Management)

Controller Quorum

在 Kafka 2.x 中,单个控制器负责整个集群的状态管理。如果该控制器出现故障,会导致短暂的服务中断。而在 Kafka 4.0 中,通过引入多个控制器形成一个 Quorum,增强了系统的容错能力和可用性。

Metadata Management

Kafka 4.0 提供了更高效的元数据管理机制,允许更快地传播和应用配置变更,同时减少不必要的网络通信和磁盘I/O。

3. 性能与稳定性提升

  • 性能优化: Kafka 4.0 在日志压缩、消息复制等方面进行了多项优化,提高了吞吐量并降低了延迟。
  • 稳定性改进: 新版本中加强了对异常情况的处理能力,比如更好的网络分区恢复机制,以及更加健壮的日志清理策略。

4. 配置参数变化

随着架构的变迁,部分配置参数在 Kafka 4.0 中有所调整或新增。例如,与 KRaft 相关的 controller.quorum.votersprocess.roles 参数被引入,用于定义控制器角色及投票成员。

5. 升级注意事项与兼容性

从 Kafka 2.x 升级至 Kafka 4.0 需要特别注意以下几点:

  • 数据迁移: 确保现有数据能够平滑过渡到新的存储格式。
  • 配置更新: 修改旧版配置文件以适应新版要求。
  • 测试验证: 在生产环境部署前,务必进行全面的功能测试和负载测试。

1、开始部署测试,以centos8纯净系统为例

1.更新yum源为aliyun --环境已准备好的,可跳过

sudo cp /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.backup

使用wget命令下载阿里云提供的CentOS Base源配置文件。

sudo wget -O /etc/yum.repos.d/CentOS-Base.repo  http://mirrors.aliyun.com/repo/Centos-8.repo

清理缓存并生成新的缓存

下载完成后,清除旧的yum缓存,并生成新的缓存,以确保系统能够识别新的repo源

sudo yum clean all

sudo yum makecache

更新系统- 可选

yum update -y

yum install gcc* -y

2、安装 java17:

yum install -y yum-utils

yum install -y java-17-openjdk java-17-openjdk-devel

3、下载kafka

访问官网找到最新的下载链接 https://kafka.apache.org/downloads

cd /opt

wget  https://dlcdn.apache.org/kafka/4.0.0/kafka_2.13-4.0.0.tgz

tar zxvf kafka_2.13-4.0.0.tgz

mv kafka_2.13-4.0.0 kafka

更改kafka数据存储目录

kafka 默认数据存储的目录是/tmp下的,一般根目录比较小,这里改成/data/kafka

mkdir -p /data/kafka

sudo sed -i \'s#log.dirs=/tmp/kraft-combined-logs#log.dirs=/data/kafka#g\' /opt/kafka/config/server.properties

初始化kafka

kafka角色 分为Broker 和 Controller, 集群模式下可以分开部署,这里只有1台,所以直接一起初始化了,这一步是因为kafka3.0之后去掉了zookeeper,改成KRaft模式,kafka2.x不需要

export KAFKA_CLUSTER_ID=\"$(uuidgen)\"

/opt/kafka/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /opt/kafka/config/server.properties

/opt/kafka/bin/kafka-storage.sh format -t \"$(uuidgen)\" -c /opt/kafka/config/server.properties --standalone

启动kafka

这里就可以启动kafka了,默认是只能localhost本机访问,无加密方式

/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

看到INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer) 就代表启动成功了

配置远程访问

1、selinux开放9093端口

确认当前 SELinux(Security-Enhanced Linux)状态:

getenforce
  • 1.

输出可能是以下三种之一:

  • Enforcing:SELinux 正常启用并强制执行安全策略。
  • Permissive:SELinux 启用但不会强制执行,仅记录日志。
  • Disabled:SELinux 已关闭。

开放端口

sudo firewall-cmd --permanent --add-port=9093/tcp

sudo firewall-cmd --reload

验证端口开放状态

root@localhost /opt# sudo firewall-cmd --list-ports

9093/tcp

修改kafka配置文件,允许远程访问

vi /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

找到 advertised.listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093

替换为 advertised.listeners=PLAINTEXT://localhost:9092,CONTROLLER://10.0.70.189:9093

启动测试

/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

看到INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer) 就代表启动成功了

从其他机器telnet kafka的9093端口

telnet 10.0.70.189 9093Trying 10.0.70.189...Connected to 10.0.70.189.Escape character is \'^]\'.
  • 1.
  • 2.
  • 3.
  • 4.

如果不通,先到服务器上看下有没有监听,netstat -antlp | grep 9093 监听的地址如果是127那说明没改成功,杀掉进程重新拉起来,

设置为系统服务

vim /etc/systemd/system/kafka.service

[Unit]

Description=Apache Kafka Server

Documentation=http://kafka.apache.org/documentation.html

After=network.target

[Service]

# 指定运行 Kafka 的用户和组

User=root

Group=root

# Kafka 启动命令路径和配置文件路径

ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

ExecStop=/opt/kafka/bin/kafka-server-stop.sh

# 限制重启次数防止无限重启

Restart=on-failure

RestartSec=30

# 设置工作目录

WorkingDirectory=/data/kafka

# 日志输出设置(可选)

StandardOutput=file:/var/log/kafka/kafka.stdout.log

StandardError=file:/var/log/kafka/kafka.stderr.log

[Install]

WantedBy=multi-user.target

设置开机启动

systemctl start kafka

systemctl enable kafka

确认服务状态

systemctl status kafka

查看日志可以用  journalctl -u kafka -f