kafka4.0新版本体验
Kafka 4.0 与 Kafka 2.x 对比:架构演进与功能升级
Apache Kafka 自发布以来,凭借其高吞吐、持久化、分布式等特性,已成为大数据生态系统中实时流处理的核心组件。随着版本的不断演进,Kafka 在性能、可维护性和易用性方面持续优化。尤其是从 Kafka 3.x 开始,社区逐步推进去 ZooKeeper 化(KRaft 模式),并在 Kafka 4.0 中进一步完善了这一架构变革。
1. 架构对比(ZooKeeper vs KRaft)
在 Kafka 2.x 版本中,Kafka 强烈依赖 ZooKeeper 来管理元数据和集群协调。然而,随着集群规模的增长,这种依赖性带来了性能瓶颈和运维复杂度。自 Kafka 3.0 开始引入了 KRaft 模式,并在 Kafka 4.0 中进一步完善,旨在彻底移除对 ZooKeeper 的依赖。
- Kafka 2.x: 使用 ZooKeeper 存储集群状态、配置和分区信息。这导致额外的系统开销,并且需要维护 ZooKeeper 集群。
- Kafka 4.0: 引入了基于 Raft 共识算法的 KRaft 控制器,将元数据管理和存储直接集成到 Kafka 内部,减少了外部依赖,提升了系统的可扩展性和稳定性。
2. 核心功能增强(Controller Quorum、Metadata Management)
Controller Quorum
在 Kafka 2.x 中,单个控制器负责整个集群的状态管理。如果该控制器出现故障,会导致短暂的服务中断。而在 Kafka 4.0 中,通过引入多个控制器形成一个 Quorum,增强了系统的容错能力和可用性。
Metadata Management
Kafka 4.0 提供了更高效的元数据管理机制,允许更快地传播和应用配置变更,同时减少不必要的网络通信和磁盘I/O。
3. 性能与稳定性提升
- 性能优化: Kafka 4.0 在日志压缩、消息复制等方面进行了多项优化,提高了吞吐量并降低了延迟。
- 稳定性改进: 新版本中加强了对异常情况的处理能力,比如更好的网络分区恢复机制,以及更加健壮的日志清理策略。
4. 配置参数变化
随着架构的变迁,部分配置参数在 Kafka 4.0 中有所调整或新增。例如,与 KRaft 相关的 controller.quorum.voters
和 process.roles
参数被引入,用于定义控制器角色及投票成员。
5. 升级注意事项与兼容性
从 Kafka 2.x 升级至 Kafka 4.0 需要特别注意以下几点:
- 数据迁移: 确保现有数据能够平滑过渡到新的存储格式。
- 配置更新: 修改旧版配置文件以适应新版要求。
- 测试验证: 在生产环境部署前,务必进行全面的功能测试和负载测试。
1、开始部署测试,以centos8纯净系统为例
1.更新yum源为aliyun --环境已准备好的,可跳过
sudo cp /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.backup
使用wget命令下载阿里云提供的CentOS Base源配置文件。
sudo wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-8.repo
清理缓存并生成新的缓存
下载完成后,清除旧的yum缓存,并生成新的缓存,以确保系统能够识别新的repo源
sudo yum clean all
sudo yum makecache
更新系统- 可选
yum update -y
yum install gcc* -y
2、安装 java17:
yum install -y yum-utils
yum install -y java-17-openjdk java-17-openjdk-devel
3、下载kafka
访问官网找到最新的下载链接 https://kafka.apache.org/downloads
cd /opt
wget https://dlcdn.apache.org/kafka/4.0.0/kafka_2.13-4.0.0.tgz
tar zxvf kafka_2.13-4.0.0.tgz
mv kafka_2.13-4.0.0 kafka
更改kafka数据存储目录
kafka 默认数据存储的目录是/tmp下的,一般根目录比较小,这里改成/data/kafka
mkdir -p /data/kafka
sudo sed -i \'s#log.dirs=/tmp/kraft-combined-logs#log.dirs=/data/kafka#g\' /opt/kafka/config/server.properties
初始化kafka
kafka角色 分为Broker 和 Controller, 集群模式下可以分开部署,这里只有1台,所以直接一起初始化了,这一步是因为kafka3.0之后去掉了zookeeper,改成KRaft模式,kafka2.x不需要
export KAFKA_CLUSTER_ID=\"$(uuidgen)\"
/opt/kafka/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /opt/kafka/config/server.properties
/opt/kafka/bin/kafka-storage.sh format -t \"$(uuidgen)\" -c /opt/kafka/config/server.properties --standalone
启动kafka
这里就可以启动kafka了,默认是只能localhost本机访问,无加密方式
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
看到INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer) 就代表启动成功了
配置远程访问
1、selinux开放9093端口
确认当前 SELinux(Security-Enhanced Linux)状态:
getenforce
- 1.
输出可能是以下三种之一:
Enforcing
:SELinux 正常启用并强制执行安全策略。Permissive
:SELinux 启用但不会强制执行,仅记录日志。Disabled
:SELinux 已关闭。
开放端口
sudo firewall-cmd --permanent --add-port=9093/tcp
sudo firewall-cmd --reload
验证端口开放状态
root@localhost /opt# sudo firewall-cmd --list-ports
9093/tcp
修改kafka配置文件,允许远程访问
vi /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
找到 advertised.listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093
替换为 advertised.listeners=PLAINTEXT://localhost:9092,CONTROLLER://10.0.70.189:9093
启动测试
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
看到INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer) 就代表启动成功了
从其他机器telnet kafka的9093端口
telnet 10.0.70.189 9093Trying 10.0.70.189...Connected to 10.0.70.189.Escape character is \'^]\'.
- 1.
- 2.
- 3.
- 4.
如果不通,先到服务器上看下有没有监听,netstat -antlp | grep 9093 监听的地址如果是127那说明没改成功,杀掉进程重新拉起来,
设置为系统服务
vim /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
After=network.target
[Service]
# 指定运行 Kafka 的用户和组
User=root
Group=root
# Kafka 启动命令路径和配置文件路径
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
# 限制重启次数防止无限重启
Restart=on-failure
RestartSec=30
# 设置工作目录
WorkingDirectory=/data/kafka
# 日志输出设置(可选)
StandardOutput=file:/var/log/kafka/kafka.stdout.log
StandardError=file:/var/log/kafka/kafka.stderr.log
[Install]
WantedBy=multi-user.target
设置开机启动
systemctl start kafka
systemctl enable kafka
确认服务状态
systemctl status kafka
查看日志可以用 journalctl -u kafka -f