HoRain云--Spark Streaming 是如何实现流式处理的?
🎬 HoRain 云小助手:个人主页
⛺️生活的理想,就是为了理想的生活!
⛳️ 推荐
前些天发现了一个超棒的服务器购买网站,性价比超高,大内存超划算!忍不住分享一下给大家。点击跳转到网站。
目录
⛳️ 推荐
1. 核心模型:DStream(离散流)
2. 微批处理架构
(1) 数据接收与分块
(2) 数据处理
3. 关键组件与流程
4. 状态管理与窗口操作
(1) 无状态处理
(2) 有状态处理
(3) 窗口操作
5. 容错与恢复
6. 性能优化
7. 应用场景示例
实时日志分析
8. 与 Structured Streaming 对比
总结
1. 核心模型:DStream(离散流)
- DStream 是 Spark Streaming 的基础抽象,表示一个持续的数据流。
- 本质:DStream 由一系列 RDD(弹性分布式数据集) 构成,每个 RDD 对应一个时间窗口内的数据。
- 处理流程:
实时数据流 → 按时间窗口切分 → 生成 DStream → 转换为 RDD → 应用 Spark 算子(map/reduce等)→ 输出结果
2. 微批处理架构
(1) 数据接收与分块
- 输入源:支持 Kafka、Flume、TCP Socket、HDFS 等。
- 接收器(Receiver):
- 每个接收器在 Executor 中运行,持续接收数据并分块存储到内存。
- 数据块默认按 批次间隔(Batch Interval)(如 1 秒)切分,形成 RDD。
(2) 数据处理
- Driver 调度:
- Driver 将每个批次的 RDD 组成 DStream,并生成 Spark Job。
- Job 提交到集群,由 Executor 并行处理。
- 容错机制:
- 预写日志(WAL):接收器先将数据写入 HDFS/S3 等可靠存储,再处理。
- 检查点(Checkpoint):定期保存 DStream 的元数据(如窗口操作状态)到可靠存储。
3. 关键组件与流程
graph LRA[输入源] --> B(Receiver)B --> C{数据分块}C --> D[内存存储]D --> E[Driver 生成 RDD]E --> F[Spark 引擎处理]F --> G[输出到外部系统]
4. 状态管理与窗口操作
(1) 无状态处理
- 每个批次独立处理,如过滤、映射:
val lines: DStream[String] = ssc.socketTextStream(\"localhost\", 9999)val words = lines.flatMap(_.split(\" \"))val pairs = words.map(word => (word, 1))val wordCounts = pairs.reduceByKey(_ + _)
(2) 有状态处理
- updateStateByKey:跨批次维护状态(如累计计数):
def updateFunc(newValues: Seq[Int], state: Option[Int]): Option[Int] = { Some(newValues.sum + state.getOrElse(0))}val stateDStream = wordCounts.updateStateByKey(updateFunc)
(3) 窗口操作
- 基于时间窗口(如过去 5 分钟)聚合数据:
// 每 10 秒统计过去 30 秒的词频val windowedCounts = pairs.reduceByKeyAndWindow( (a: Int, b: Int) => a + b, Seconds(30), Seconds(10))
5. 容错与恢复
- 数据容错:
- 接收器故障:通过 WAL 恢复未处理的数据块。
- Executor 故障:RDD 依赖血缘(Lineage)重新计算丢失的分区。
- Driver 容错:
- 启用检查点后,Driver 故障可从检查点重启并恢复 DStream 状态。
6. 性能优化
- 批次间隔:根据延迟需求调整(通常 500ms~2s)。
- 并行度:
- 增加接收器数量(如 Kafka 分区数)提升数据摄入速度。
- 调整
spark.default.parallelism
控制处理并行度。
- 内存管理:
- 避免数据堆积:设置
spark.streaming.receiver.maxRate
限制接收速率。 - 使用序列化(Kryo)减少内存占用。
- 避免数据堆积:设置
7. 应用场景示例
实时日志分析
// 从 Kafka 读取日志,统计错误码频率val kafkaParams = Map(\"bootstrap.servers\" -> \"kafka:9092\")val topics = Set(\"logs\")val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))val errorCodes = stream .map(record => parseLog(record.value())) .filter(_.status >= 500) .map(log => (log.errorCode, 1)) .reduceByKey(_ + _)errorCodes.saveAsTextFiles(\"hdfs:///errors\")
8. 与 Structured Streaming 对比
总结
Spark Streaming 通过 微批处理 和 RDD 抽象 实现高吞吐、容错的流式处理,适合对延迟要求不苛刻(秒级)的场景。对于更复杂的实时需求(如毫秒级延迟、事件时间处理),可考虑升级到 Structured Streaming 或使用 Apache Flink。
❤️❤️❤️本人水平有限,如有纰漏,欢迎各位大佬评论批评指正!😄😄😄
💘💘💘如果觉得这篇文对你有帮助的话,也请给个点赞、收藏下吧,非常感谢!👍 👍 👍
🔥🔥🔥Stay Hungry Stay Foolish 道阻且长,行则将至,让我们一起加油吧!🌙🌙🌙