电站管理系统中Kafka理解_kafka协议和104协议
Apache Kafka: 是一个分布式流处理平台,广泛用于处理大量实时数据流。它能够高效地接收、存储、传递和处理数据流。Kafka 的设计目标是提供高吞吐量、低延迟、水平可扩展的消息队列系统。Kafka 广泛应用于日志收集、实时数据处理、数据流分析等场景。
Kafka在电站管理系统中的应用
首先看一下电站管理系统的业务场景:
例如对于电站的管理系统,需要实时处理电站的数据,其业务逻辑为
1. 通过104协议采集电站设备数据,并向电站管理系统传递数据。
2. 数据通过104协议采集后,首先进入kafka中的一个生产者。kafka作为中间层的消息队列,能够对数据进行解耦和异步处理,并将数据以消息的形式发布到Kafka的某个主题中。
3. 消费者(consumer)从Kafka中读取数据。处理程序(比如实时数据分析、异常检测)作为Kafka的消费者,对队列中的数据进行相关的业务处理。
4. 对于需要快速访问的数据,可以将处理后的数据写入redis,提高系统的响应速度。
5. Kafka的消费者也可以将数据写入数据库持久化存储。
Kafka 的核心概念
Kafka 的核心概念包括 Producer(生产者)、Consumer(消费者)、Broker(代理)和 Topic(主题)。
1. Producer(生产者):生产者是向 Kafka 发送数据的客户端。它负责将消息发送到一个或多个 Kafka 主题(topic)。
-
设备数据(JSON格式)发送至Kafka Topic(如
station-data
)。 -
数据示例:
{ \"station_id\": \"station_001\", \"voltage\": 220.5, \"current\": 10.2, \"temperature\": 35.5}
2. Consumer(消费者):消费者从 Kafka 的主题中消费消息。多个消费者可以组成一个Consumer Group,每个 Consumer Group 内的消费者可以并行处理消息。
3. Broker(代理):Kafka Broker 是一个 Kafka 实例,它接收来自生产者的数据,并将其存储到硬盘上。多个 Kafka Broker 形成 Kafka 集群,提供高可用性和负载均衡。
4. Topic(主题):Kafka 中的主题(topic)是消息的类别,生产者将消息发布到特定的主题,消费者从主题中读取消息。
5. Partition(分区):每个 Kafka 主题都被划分为多个分区,分区是 Kafka 中消息存储的基本单元。每个分区是一个有序的消息队列。消息按顺序写入分区,消费时也按顺序读取。
6. Offset(偏移量):每条消息都有一个唯一的 offset,表示该消息在分区中的位置。消费者通过 offset 来追踪自己读取消息的位置。
Kafka 消息传递流程
1. 生产者将消息写入 Kafka 的Topic。
2. Kafka 将消息存储在主题的 Partition中。
3. 消费者从主题的分区中读取消息,通过 offset 来确保消息读取的顺序。
4. 消息在 Kafka 中长期存储,直到设置的过期时间或大小限制。
生产者代码示例
import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;public class StationDataProducer { public static void main(String[] args) { // Kafka配置 Properties props = new Properties(); props.put(\"bootstrap.servers\", \"localhost:9092\"); // 配置key序列化 props.put(\"key.serializer\", StringSerializer.class.getName()); // 配置value序列化 props.put(\"value.serializer\", StringSerializer.class.getName()); // 创建生产者,由于前面序列化配置的是StringSerializer,故而泛型也是String KafkaProducer producer = new KafkaProducer(props); // 构造设备数据 String stationData = \"{\\\"station_id\\\":\\\"station_001\\\",\\\"voltage\\\":220.5,\\\"current\\\":10.2,\\\"temperature\\\":35.5}\"; // 发送数据 ProducerRecord record = new ProducerRecord(\"station-data\", \"station_001\", stationData); producer.send(record); producer.close(); }}
消费者代码示例:
import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.serialization.StringDeserializer;public class StationDataConsumer { public static void main(String[] args) { // Kafka配置 Properties props = new Properties(); props.put(\"bootstrap.servers\", \"localhost:9092\"); props.put(\"group.id\", \"station-data-group\"); // 配置反序列化,与前面的序列化类似 props.put(\"key.deserializer\", StringDeserializer.class.getName()); props.put(\"value.deserializer\", StringDeserializer.class.getName()); // 创建消费者 KafkaConsumer consumer = new KafkaConsumer(props); consumer.subscribe(Collections.singletonList(\"station-data\")); // 持续消费 while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.println(\"Received: \" + record.value()); // 实际处理逻辑(存储/告警等) } } }}
Kafka 分区
分区的基本概念
-
Topic 是逻辑概念,而 Partition 是物理存储单元。
-
例如:
station-data
Topic 可以划分为多个 Partition(如partition-0
、partition-1
)。
-
-
每个 Partition 是一个有序、不可变的消息队列,数据按写入顺序存储。
-
不同 Partition 之间数据是独立的,没有顺序保证。
分区如何提升并行度?
-
生产者写入时:
-
Kafka 根据 分区策略(如轮询、Key哈希)决定数据写入哪个 Partition。
-
例如:
// 发送消息时,可指定 Key 决定写入哪个 PartitionProducerRecord record = new ProducerRecord(\"station-data\", \"device_001\", data); // Key=\"device_001\"
-
相同 Key 的消息会进入同一个 Partition(保证局部有序)。
-
-
消费者读取时:
-
一个 Partition 只能被同一个消费者组内的一个 Consumer 消费。
-
如果 Topic 有 2 个 Partition,那么:
-
1 个 Consumer:消费所有 Partition(单线程)。
-
2 个 Consumer:每个 Consumer 消费 1 个 Partition(并行处理)。
-
3 个 Consumer:仍然只有 2 个 Consumer 能消费,1 个闲置(Partition 数限制最大并行度)。
-
-