> 文档中心 > 【Kafka】第一篇-初识Kafka

【Kafka】第一篇-初识Kafka


高性能消息中间件Kafka

前言:kafaka之前在两年前就学过了,项目中只会使用,原理以及基本知识,一直就忘记了,现在再捡起来深入学习。

目录导航

  • 高性能消息中间件Kafka
    • What is Kafka?
    • Kafka名称的由来
    • Kafka的发展历程
    • Kafka的版本演进历程
    • Kafka运行环境
    • Kafka核心配置文件
    • Kafka快速体验
    • Kafka Client消息传递开发
    • SpringBoot集成Kafka
    • Kafka后台管理工具
    • Kafka主要用于什么场景

What is Kafka?

官网:http://kafka.apache.org/

kafka是一个开源的,分布式的,高吞吐量的、消息发布和订阅系统,也是大数据中用作数据交换的核心组件之一,以高性能,社区活跃备受广大开发者喜爱;
kafka最初由LinkedIn(领英,全球最大的面向职场人士的社交网站)设计开发,是为了解决LinkedIn的数据管道问题,用于LinkedIn网站的活动流数据和运营数据处理工具,这其中活动流数据是指页面访问量、被查看页面内容方面的信息以及搜索情况等内容,运营数据是指服务器的性能数据(CPU、IO使用率、请求时间、服务日志等数据);
刚开始Linkedin采用的是ActiveMQ来进行数据交换,大约在2010年前后,那时的ActiveMQ还远远无法满足LinkedIn对数据交换传输的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了解决这个问题,LinkedIn决定研发自己的消息传递系统,当时LinkedIn的首席架构师jay kreps便开始组织团队进行消息传递系统的研发;

Kafka名称的由来

由于kafka的架构师jay kreps非常喜欢franz kafka(是奥匈帝国一位使用德语的小说家和短篇犹太人故事家,被评论家们认为是20世纪作家中最具影响力的一位),并且觉得kafka这个名字很酷,因此把这一块消息传递系统取名为kafka;

现在kafka已被大量不同类型的公司采用,国内也被大量使用,作为其内部各种数据的处理工具或消息队列服务;

Kafka的发展历程

2010年底,kafka在Github上开源,初始版本为0.7.0;
2011年7月,因为备受关注,被纳入apache孵化器项目;
2012年10月,kafka从apache孵化器项目毕业,成为apache顶级项目;
2014年,jay kreps离开LinkedIn,成立confluent公司,此后LinkedIn和confluent成为kafka的核心贡组织,致力于kafka的版本迭代升级和推广应用;

Kafka的版本演进历程

Kafka前期项目版本似乎有点凌乱,kafka在1.x之前的版本,是采用4位版本号,比如:0.8.2.2、0.9.0.1、0.10.0.0…等等;

在1.x之后,kafka 采用 Major.Minor.Patch 的三位版本号;

Major表示大版本,通常是一些重大改变,因此彼此之间功能可能会不兼容;Minor表示小版本,通常是一些新功能的增加;

Patch表示修订版,主要为修复一些重点Bug而发布的版本;

比如:Kafka 2.1.3,大版本就是2,小版本是1,Patch版本为3,是为修复Bug发布的第3个版本;

Kafka总共发布了7个大版本,分别是0.7.x、0.8.x、0.9.x、0.10.x、0.11.x、1.x及2.x版本,截止目前,最新版本是Kafka 2.5.0,也是最新稳定版本; 

kafka是由Scala语言编写而成,Scala 运行在Java虚拟机上,并兼容现有的Java程序,因此部署kakfa的时候,需要先安装JDK环境;

Kafka源码: https://github.com/apache/kafka

Scala:https://www.scala-lang.org/

Kafka运行环境

1、Linux上需要安装Java环境;(省略)2、Linux上安装Zookeeper;(第一期学习过)官网:https://zookeeper.apache.org/下载:https://zookeeper.apache.org/releases.html 解压:tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz解压后zookeeper即完成了安装,然后配置zookeeper:切换到conf目录下:cd conf复制一个zoo_sample.cfg到zoo.cfg文件;dataDir=/usr/local/apache-zookeeper-3.6.1-bin/dataadmin.serverPort=8907  (添加一下该配置项)切换到zookeeper安装主目录下的bin目录:cd apache-zookeeper-3.6.1-bin/bin启动zookeeper:./zkServer.sh start连接Zookeeper图形界面客户端:ZooInspector客户端软件; 3、Linux上安装Kafka;下载:http://kafka.apache.org/downloads 其中kafka_2.13中的2.13表示的是scala的版本,因为Kafka服务器代码完全由Scala语言编写,- 后面的2.5.0表示kafka的版本;解压:tar -zxvf kafka_2.13-2.5.0.tgz解压后即完成了安装,不需要做其他操作;切换到解压后的目录:cd kafka_2.13-2.5.0修改server.properties配置:listeners=PLAINTEXT://192.168.172.127:9092advertised.listeners=PLAINTEXT://192.168.172.127:9092zookeeper.connect=localhost:2181 启动kafka:./kafka-server-start.sh ../config/server.properties &Kafka启动后默认端口:9092查看kafka进程:ps -ef | grep kafka 或者 jps注:kafka启动需要先启动zookeeper;关闭kafka:./kafka-server-stop.sh ../config/server.properties

Kafka核心配置文件

config/server.properties配置文件

#唯一的服务器id,每台机器分别不一样broker.id=1 #监听ip和端口listeners=PLAINTEXT://192.168.172.131:9092 #节点需要绑定的主机名称,如果没有设置,服务器会绑定到所有接口host.name=192.168.172.131 #节点的主机名会通知给生产者和消费者,如果没有设置,它将会使用"host.name"的值(前提是设置了host.name),否则他会使用java.net.InetAddress.getCanonicalHostName()的返回值advertised.listeners=PLAINTEXT://192.168.172.131:9092 # 接受网络请求的线程数num.network.threads=3 # 进行磁盘IO的线程数num.io.threads=8 # 套接字服务器使用的发送缓冲区大小socket.send.buffer.bytes=102400 # 套接字服务器使用的接收缓冲区大小socket.receive.buffer.bytes=102400 #  单个请求最大能接收的数据量socket.request.max.bytes=104857600 #用来存储日志文件log.dirs=/tmp/kafka-logs # 每个主题的日志分区的默认数量,更多的分区允许更大的并行操作,但是它会导致节点产生更多的文件num.partitions=1 # 每个数据目录中的线程数,用于在启动时日志恢复,并在关闭时刷新num.recovery.threads.per.data.dir=1 # 在强制刷新数据到磁盘执行时允许接收消息的数量#log.flush.interval.messages=10000 # 在强制刷新之前,消息可以在日志中停留的最长时间#log.flush.interval.ms=1000 #一个日志的最小存活时间,可以被删除log.retention.hours=168 #  一个基于大小的日志保留策略,段将被从日志中删除只要剩下的部分段不低于log.retention.bytes#log.retention.bytes=1073741824 #  每一个日志段大小的最大值。当到达这个大小时,会生成一个新的片段。log.segment.bytes=1073741824 # 检查日志段的时间间隔,看是否可以根据保留策略删除它们log.retention.check.interval.ms=300000 # 这是一个以逗号为分割的部分,每一个都匹配一个Zookeeper,选举的时候依赖于zookeeper,写下zookeeper的地址zookeeper.connect=192.168.72.130:2181,192.168.72.131:2181,192.168.72.132:2181 # 连接到Zookeeper的超时时间zookeeper.connection.timeout.ms=6000

Kafka快速体验

创建 topic

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

replication-factor 表示该 topic 需要在不同的 broker 中保存几份,这里设置成 1,表示在两个 broker 中保存两份;Partitions 表示分区数查看 topic ./kafka-topics.sh --list --zookeeper localhost:2181查看 topic 属性 ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test消费消息 ./kafka-console-consumer.sh -bootstrap-server 192.168.172.127:9092 --topic test --from-beginning发送消息 ./kafka-console-producer.sh --broker-list 192.168.172.127:9092 --topic test

Kafka Client消息传递开发

    org.apache.kafka    kafka-clients    2.5.0 生产者的几个配置项acks

acks 表示 producer 发送消息到 broker 上以后的确认值,有三个取值:

0:表示 producer 不需要等待 broker 的消息确认,该选项延时最小但同时风险最大(当 server 宕机时,数据将会丢失);

1:表示 producer 只需要获得 kafka 集群中的 leader 节点确认即可,该选项延时较小同时确保了leader 节点确认接收成功;

-1:需要 ISR 中所有的 Replica 给予接收确认,速度最慢,安全性最高;

batch.size

生产者发送多个消息到 broker 上的同一个分区时,为了减少网络请求带来的

性能开销,通过批量的方式来提交消息,可以通过这个参数来控制批量提交的

字节数大小,默认大小是 16384byte,也就是 16kb,意味着当一批消息大小达到指定的 batch.size 的时候会统一发送;

linger.ms

Producer 默认会把两次发送时间间隔内收集到的所有 Requests 进行一次聚合然后再发送,以此提高吞吐量,而 linger.ms 就是为每次发送到 broker 的请求增加一些 delay,以此来聚合更多的 Message 请求,这个有点想 TCP 里面的Nagle 算法,在 TCP 协议的传输中,为了减少大量小数据包的发送,采用了Nagle 算法,也就是基于小包的等-停协议。

batch.size 和 linger.ms 这两个参数是 kafka 性能优化的关键参数,如果两个都配置了,那么只要满足其中一个要求,就会发送请求到 broker 上;

max.request.size

设置请求的数据的最大字节数,为了防止发生较大的数据包影响吞吐量,默

认值为 1MB;

消费端的几个配置项

group.id

consumer group 是 kafka 提供的可扩展且具有容错性的消费者机制。一个组里可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的 group ID,组内的所有消费者协调在一起来消费订阅主题(subscribed topics)上的所有分区(partition)的消息。但是每个分区只能由同一个消费组内的一个 consumer 来消费;

enable.auto.commit

消费者消费消息以后自动提交,只有当消息提交以后,该消息才不会被再次接

收到,还可以配合 auto.commit.interval.ms 控制自动提交的频率,当然,也可以通过 consumer.commitSync()的方式实现手动提交;

auto.offset.reset

该参数是针对新的 groupid 中的消费者而言的,当有新 groupid 的消费者来消费指定的 topic 时,对于该参数的配置,会有不同的语义

auto.offset.reset=latest 情况下,新的消费者将会从其他消费者最后消费的

offset处开始消费 Topic 下的消息;

auto.offset.reset= earliest 情况下,新的消费者会从该 topic 最早的消息开始消费;

auto.offset.reset=none 情况下,新的消费者加入以后,由于之前不存在

offset,则会直接抛出异常;

max.poll.records

此设置限制每次调用 poll 返回的消息数;

SpringBoot集成Kafka

    org.springframework.kafka    spring-kafka

application.properties配置文件的配置项说明:

#################consumer的配置参数(开始)#################

#如果’enable.auto.commit’为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。

spring.kafka.consumer.auto-commit-interval;

#当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最新的偏移量

#可选的值为latest, earliest, none

spring.kafka.consumer.auto-offset-reset=latest;

#以逗号分隔的主机:端口对列表,用于建立与Kafka群集的初始连接

spring.kafka.consumer.bootstrap-servers;

#ID在发出请求时传递给服务器;用于服务器端日志记录

spring.kafka.consumer.client-id;

#如果为true,则消费者的偏移量将在后台定期提交,默认值为true

spring.kafka.consumer.enable-auto-commit=true;

#如果没有足够的数据立即满足“fetch.min.bytes”给出的要求,服务器在回答获取请求之前将阻塞的最长时间(以毫秒为单位)

#默认值为500

spring.kafka.consumer.fetch-max-wait;

#服务器应以字节为单位返回获取请求的最小数据量,默认值为1,对应的kafka的参数为fetch.min.bytes;

spring.kafka.consumer.fetch-min-size;

#用于标识此使用者所属的使用者组的唯一字符串

spring.kafka.consumer.group-id;

#心跳与消费者协调员之间的预期时间(以毫秒为单位),默认值为3000

spring.kafka.consumer.heartbeat-interval;

#密钥的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#值的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#一次调用poll()操作时返回的最大记录数,默认值为500

spring.kafka.consumer.max-poll-records;

#################consumer的配置参数(结束)#################

#################producer的配置参数(开始)#################

#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:

#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。

#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。

#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。

#可以设置的值为:all, -1, 0, 1

spring.kafka.producer.acks=1

#每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,

#这有助于提升客户端和服务器上的性能,此配置控制默认批量大小(以字节为单位),默认值为16384

spring.kafka.producer.batch-size=16384

#以逗号分隔的主机:端口对列表,用于建立与Kafka群集的初始连接

spring.kafka.producer.bootstrap-servers

#生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认值为33554432

spring.kafka.producer.buffer-memory=33554432

#ID在发出请求时传递给服务器,用于服务器端日志记录

spring.kafka.producer.client-id

#生产者生成的所有数据的压缩类型,此配置接受标准压缩编解码器(‘gzip’,‘snappy’,‘lz4’),

#它还接受’uncompressed’以及’producer’,分别表示没有压缩以及保留生产者设置的原始压缩编解码器,

#默认值为producer

spring.kafka.producer.compression-type=producer

#key的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

#值的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#如果该值大于零时,表示启用重试失败的发送次数

spring.kafka.producer.retries

#################producer的配置参数(结束)#################

#################listener的配置参数(结束)#################

#侦听器的AckMode,参见https://docs.spring.io/spring-kafka/reference/htmlsingle/#committing-offsets

#当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效

spring.kafka.listener.ack-mode;

#在侦听器容器中运行的线程数

spring.kafka.listener.concurrency;

#轮询消费者时使用的超时(以毫秒为单位)

spring.kafka.listener.poll-timeout;

#当ackMode为“COUNT”或“COUNT_TIME”时,偏移提交之间的记录数

spring.kafka.listener.ack-count;

#当ackMode为“TIME”或“COUNT_TIME”时,偏移提交之间的时间(以毫秒为单位)

spring.kafka.listener.ack-time;

#################listener的配置参数(结束)#################

Kafka后台管理工具

Kafka-tool

一个图形界面的工具,对目前最新版本的kafka2.5.0还不支持或者有点问题;

下载地址:https://www.kafkatool.com/

cmak管控台 (原来名字:kafka-manager)

一个web后台管理系统,可以管理kafka;

项目地址: https://github.com/yahoo/CMAK

注意该管控台运行需要JDK11或以上版本的支持;

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1HOeVdUg-1650986342382)(file:///C:/Users/ADMINI~1/AppData/Local/Temp/msohtmlclip1/01/clip_image002.jpg)]

下载:https://github.com/yahoo/CMAK/releases

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PoC7NU09-1650986342384)(file:///C:/Users/ADMINI~1/AppData/Local/Temp/msohtmlclip1/01/clip_image004.jpg)]

下载下来是一个zip压缩包,直接 unzip解压:

unzip cmak-3.0.0.5.zip

解压后即完成了安装;

修改conf目录下的application.conf配置文件:

kafka-manager.zkhosts=“192.168.172.127:2181”

#kafka-manager.zkhosts=${?ZK_HOSTS}

cmak.zkhosts=“192.168.172.127:2181”

#cmak.zkhosts=${?ZK_HOSTS}

切换到bin目录下执行:

./cmak -Dconfig.file=…/conf/application.conf -java-home /usr/local/jdk-11.0.6

其中-Dconfig.file是指定配置文件,-java-home是指定jdk11所在位置,如果机器上已经是jdk11,则不需要指定;

启动之后默认端口是9000,然后访问:http://192.168.172.127:9000/

Kafka主要用于什么场景

1、消息系统:替换传统的消息系统,解耦系统或缓存待处理的数据,kafka有更好的吞吐量(每秒几十万),内置了分片、复制、容错机制,是大规模数据消息处理的更好的解决方案;

2、网站活动跟踪:网站的访问量,搜索量,或者其他用户的活动行为如注册,充值,支付,购买等行为可以发布到中心的topic,每种类型可以作为一个topic,这些信息流可以被消费者订阅实时处理、实时监控或者将数据流加载到Hadoop中进行离线处理等;

3、度量统计:可以用于度量统计一些运维监控数据,将分布式的一些监控数据聚集到一起;

4、日志聚合:可以作为一个日志聚合的替换方案,如Scribe、Flume;

5、数据流处理:可以对数据进行分级处理,将从kafka获取的原始数据进行加工润色后再发布至kafka;

6、事件溯源:可以以时间为顺序记录应用事件的状态变化,从而为事件溯源。

7、提交日志:可以作为分布式系统的外部日志存储介质。

总结一下:kafka主要用于两个方面,一个是用来做消息队列,另一个是用来做数据流转平台(比如大数据实时处理领域);