pyspark中的kafka的读和写案例操作
下面将详细讲解 PySpark 中操作 Kafka 进行数据读写的案例,包括必要的配置、代码实现和关键参数说明。
PySpark 与 Kafka 集成基础
PySpark 通过 Spark Streaming 或 Structured Streaming 与 Kafka 集成,需要引入特定的依赖包。通常使用spark-sql-kafka-0-10_2.12
包,版本需要与 Spark 版本匹配。
读取 Kafka 数据(消费消息)
从 Kafka 读取数据可以分为批处理和流处理两种方式:
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, from_jsonfrom pyspark.sql.types import StructType, StringType, IntegerType# 初始化SparkSessionspark = SparkSession.builder \\ .appName(\"KafkaReaderExample\") \\ .config(\"spark.jars.packages\", \"org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0\") \\ .getOrCreate()# 1. 流处理方式读取Kafka数据def stream_read_kafka(): # 配置Kafka连接参数 kafka_df = spark.readStream \\ .format(\"kafka\") \\ .option(\"kafka.bootstrap.servers\", \"localhost:9092\") \\ .option(\"subscribe\", \"test_topic\") # 订阅的主题,可以是多个用逗号分隔 .option(\"startingOffsets\", \"earliest\") # 从最早的偏移量开始消费 .load() # Kafka返回的数据包含多个字段,我们主要关注value字段(实际消息内容) # 将二进制的value转换为字符串 kafka_df = kafka_df.selectExpr(\"CAST(key AS STRING)\", \"CAST(value AS STRING)\") # 如果消息是JSON格式,可以进一步解析 schema = StructType() \\ .add(\"id\", IntegerType()) \\ .add(\"name\", StringType()) \\ .add(\"age\", IntegerType()) parsed_df = kafka_df.select( from_json(col(\"value\"), schema).alias(\"data\") ).select(\"data.*\") # 输出到控制台(调试用) query = parsed_df.writeStream \\ .outputMode(\"append\") \\ .format(\"console\") \\ .start() query.awaitTermination()# 2. 批处理方式读取Kafka数据def batch_read_kafka(): kafka_df = spark.read \\ .format(\"kafka\") \\ .option(\"kafka.bootstrap.servers\", \"localhost:9092\") \\ .option(\"subscribe\", \"test_topic\") \\ .option(\"startingOffsets\", \"\"\"{\"test_topic\":{\"0\":0}}\"\"\") # 指定分区和偏移量 .option(\"endingOffsets\", \"\"\"{\"test_topic\":{\"0\":100}}\"\"\") \\ .load() # 转换为字符串并展示 result_df = kafka_df.selectExpr(\"CAST(key AS STRING)\", \"CAST(value AS STRING)\") result_df.show(truncate=False)if __name__ == \"__main__\": # 选择运行流处理或批处理 # stream_read_kafka() batch_read_kafka()
写入 Kafka 数据(生产消息)
同样,写入 Kafka 也支持流处理和批处理两种方式:
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, to_json, struct# 初始化SparkSessionspark = SparkSession.builder \\ .appName(\"KafkaWriterExample\") \\ .config(\"spark.jars.packages\", \"org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0\") \\ .getOrCreate()# 1. 流处理方式写入Kafkadef stream_write_kafka(): # 创建测试数据 data = [(\"1\", \"Alice\", 25), (\"2\", \"Bob\", 30), (\"3\", \"Charlie\", 35)] df = spark.createDataFrame(data, [\"id\", \"name\", \"age\"]) # 转换为Kafka所需的格式(必须包含key和value字段) kafka_df = df.select( col(\"id\").alias(\"key\"), # key字段 to_json(struct(\"id\", \"name\", \"age\")).alias(\"value\") # value字段转为JSON ) # 写入Kafka query = kafka_df.writeStream \\ .format(\"kafka\") \\ .option(\"kafka.bootstrap.servers\", \"localhost:9092\") \\ .option(\"topic\", \"test_topic\") \\ .option(\"checkpointLocation\", \"/tmp/kafka_checkpoint\") # 流处理必须设置检查点 .start() query.awaitTermination()# 2. 批处理方式写入Kafkadef batch_write_kafka(): # 创建测试数据 data = [(\"4\", \"David\", 40), (\"5\", \"Eve\", 45)] df = spark.createDataFrame(data, [\"id\", \"name\", \"age\"]) # 转换为Kafka所需格式 kafka_df = df.select( col(\"id\").cast(\"string\").alias(\"key\"), to_json(struct(\"id\", \"name\", \"age\")).alias(\"value\") ) # 写入Kafka kafka_df.write \\ .format(\"kafka\") \\ .option(\"kafka.bootstrap.servers\", \"localhost:9092\") \\ .option(\"topic\", \"test_topic\") \\ .save()if __name__ == \"__main__\": # 选择运行流处理或批处理 # stream_write_kafka() batch_write_kafka()
关键参数说明
-
连接参数:
kafka.bootstrap.servers
:Kafka 集群的地址列表,格式为host:port
subscribe
:要订阅的主题,多个主题用逗号分隔topic
:写入时指定的目标主题
-
偏移量设置:
startingOffsets
:读取的起始偏移量,earliest
(最早)或latest
(最新)endingOffsets
:批处理时的结束偏移量
-
数据格式:
- Kafka 中的数据以二进制形式存储,需要转换为字符串:
CAST(key AS STRING)
和CAST(value AS STRING)
- 写入时需要将数据转换为包含
key
和value
字段的 DataFrame
- Kafka 中的数据以二进制形式存储,需要转换为字符串:
-
流处理特殊参数:
checkpointLocation
:必须设置,用于保存流处理的状态信息outputMode
:输出模式,常用append
(追加)
运行注意事项
- 确保 Kafka 服务已启动并正常运行
- 主题需要提前创建:
kafka-topics.sh --create --topic test_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
- 依赖包版本需要与 Spark 版本匹配,例如 Spark 3.3.0 对应
spark-sql-kafka-0-10_2.12:3.3.0
- 流处理程序需要手动停止,可通过
query.stop()
或 Ctrl+C 终止
通过以上示例,你可以实现 PySpark 与 Kafka 之间的数据交互,根据实际需求选择批处理或流处理方式。