【Python】PySpark数据分析
第一部分:基石 - 解构 Spark 的分布式计算核心
第一章:超越单机之限:深入 Spark 分布式计算的本质
1.1 为何选择 Spark:从单机瓶颈到分布式革命
我们首先需要理解一个根本性的问题:我们为什么需要像 Spark 这样的工具?答案源于一个物理现实——单台计算机的局限性。在数据科学的日常工作中,我们钟爱的 pandas
库在一台性能优越的机器上可以轻松处理数百万行的数据。然而,当数据规模从 GB 级别跃升至 TB 甚至 PB 级别时,物理定律开始成为不可逾越的障碍。
-
内存瓶颈 (Memory Bottleneck): 当一个数据集的大小超过了计算机的物理内存(RAM)时,操作系统会开始使用虚拟内存,即把硬盘空间当作内存使用。硬盘的读写速度比内存慢数个数量级(机械硬盘慢 10^5 倍,固态硬盘慢 10^3 倍),这会导致数据处理性能急剧下降,甚至程序崩溃。你无法将一个 100GB 的文件完整读入一个只有 32GB 内存的机器中进行操作。
-
CPU 瓶颈 (CPU Bottleneck): 即便数据能放入内存,计算的复杂度也可能成为瓶颈。一个复杂的机器学习训练任务,或是一个需要对海量数据进行全量扫描的聚合操作,在单个 CPU(即使是多核 CPU)上可能需要运行数小时甚至数天。CPU 的核心数量和时钟频率存在物理和功耗的上限,无法无限增长。
分布式计算,正是为了打破这些单机瓶颈而生。其核心思想非常朴素:如果一台机器解决不了问题,那就用很多台机器一起来解决。
Spark 的前身,Hadoop MapReduce,是分布式计算的第一次伟大尝试。它将计算任务分解为 Map
和 Reduce
两个阶段,并能在数千台廉价商用服务器上并行执行,实现了对海量数据的处理能力。但 MapReduce 有其固有的缺陷:
- 高延迟的磁盘 I/O: MapReduce 的每一个计算阶段都会将中间结果写入分布式文件系统(HDFS)。这种设计保证了极高的容错性,但也带来了大量的磁盘读写,对于需要多步骤迭代计算的场景(如机器学习算法)性能极差。
- 编程模型僵化: 开发者被严格限制在
Map
和Reduce
的编程范式中,实现复杂的逻辑非常繁琐。
Apache Spark 的诞生,正是对 MapReduce 的一场革命。它继承了 MapReduce 的分布式、可扩展、容错的思想,但通过引入两个颠覆性的概念,彻底改变了分布式数据处理的游戏规则:
-
基于内存的计算 (In-Memory Computing): Spark 允许将中间计算结果存储在参与计算的各个节点的内存中,而不仅仅是磁盘上。对于需要反复使用的数据集,这可以带来高达 100 倍的性能提升。这使得迭代式算法(如梯度下降)和交互式数据分析成为可能。
-
有向无环图 (Directed Acyclic Graph - DAG): Spark 将一系列的计算操作构建成一个 DAG。它并不像 MapReduce 那样在每一步后都固化结果,而是将整个计算流程看作一个图。只有当需要一个最终结果(一个“Action”操作)时,Spark 才会根据这个图制定最优的执行计划,并将任务分发到集群执行。这种“惰性求值”(Lazy Evaluation)的策略为大量的自动优化提供了可能。
因此,选择 Spark,并非仅仅是选择了一个“更快的工具”,而是选择了一种全新的、能够将成百上千台计算机的计算资源和存储资源凝聚成一个统一的、强大的“超级计算机”的思维范式和编程模型。PySpark
作为 Spark 的 Python API,则成功地将这种强大的分布式能力与 Python 语言的简洁、易用和丰富的生态系统结合在一起,成为了现代大规模数据分析和机器学习领域不可或缺的基石。
1.2 Spark 生态系统全景:核心组件与应用领域
Spark 并不是一个单一的工具,而是一个由多个紧密集成的组件构成的强大生态系统。理解每个组件的角色和定位,是有效利用 Spark 解决不同领域问题的基础。
-
Spark Core: 这是整个 Spark 项目的基石。它提供了 Spark 的核心功能,包括任务调度、内存管理、故障恢复、与存储系统的交互等。Spark Core 的核心数据抽象是弹性分布式数据集 (Resilient Distributed Dataset - RDD),它是后续所有高级 API 的基础。本指南的第二部分将深入剖析 RDD 的内部机制。
-
Spark SQL: 这是 Spark 用于处理结构化数据的模块。它在 Spark Core 之上提供了两种核心抽象:DataFrame 和 Dataset(在 PySpark 中,DataFrame 是主要的 API)。Spark SQL 允许你像使用 SQL 查询关系型数据库一样查询 Spark 中的数据,也支持直接从多种数据源(如 Hive、Parquet、JSON、CSV)读取。更重要的是,Spark SQL 包含一个名为 Catalyst 的高级查询优化器,它能够自动将用户的查询(无论是 SQL 语句还是 DataFrame 操作)优化成最高效的物理执行计划。这是 Spark 高性能的关键所在,我们将在第三部分详细探讨。
-
Spark Streaming (及 Structured Streaming): 这是 Spark 提供的用于处理流数据的组件。传统的 Spark Streaming(基于 DStream)将实时数据流切分成一系列小的批次(micro-batch),然后使用 Spark Core 引擎进行处理。而更现代的 Structured Streaming 则是构建在 Spark SQL 引擎之上,将数据流看作是一张“无边界的表”。你可以像对静态表一样对数据流执行查询,Spark 会负责在有新数据到达时增量地更新结果。这极大地简化了流处理应用的开发。
-
MLlib (Machine Learning Library): 这是 Spark 的机器学习库。它提供了大量的常用机器学习算法,这些算法都被设计为可以在集群上并行执行。MLlib 包含分类、回归、聚类、协同过滤等多种算法,以及特征提取、转换、降维和选择等工具。MLlib 利用 Spark 的迭代计算能力,在处理大规模训练数据时表现出色。
-
GraphX (及 GraphFrames): 这是 Spark 用于图计算的 API。GraphX 引入了带属性的图数据结构(Property Graph),并提供了一系列图计算操作符,如
pregel
、connectedComponents
、pageRank
等。虽然 Spark 的图计算能力不如专门的图数据库(如 Neo4j),但对于需要在已有的大规模数据集上进行图分析和挖掘的场景,GraphX 提供了一个方便且强大的解决方案。GraphFrames
是一个基于 DataFrame 的更新、更友好的图 API。
PySpark 的角色:
PySpark
并非一个独立的组件,而是整个生态系统的 Python 语言接口。它通过一个名为 Py4J
的库,让 Python 代码能够调用 Spark 底层的 Java/Scala 对象和方法。这意味着,当你使用 PySpark 编写代码时,你实际上是在 Python 进程(Driver 端)中构建计算逻辑,然后 PySpark 将这些逻辑序列化并发送到在 Java 虚拟机(JVM)中运行的 Spark Executor 上去执行。理解这一点对于后续进行性能调优(特别是关于 UDF 的部分)至关重要。
1.3 核心架构剖析:Driver、Executor 与任务的分布式之旅
为了真正掌握 PySpark,我们必须揭开其运行时的面纱,理解一个 Spark 应用(Application)是如何在集群中启动和执行的。其核心架构由三个关键角色构成:驱动器(Driver)、执行器(Executor)和集群管理器(Cluster Manager)。
核心角色定义:
-
驱动器 (Driver): 这是运行你的
main()
函数的进程。当你通过spark-submit
提交一个 PySpark 脚本时,Driver 进程就会被启动。它的核心职责是:- 创建
SparkContext
:SparkContext
是与 Spark 集群通信的入口点,负责连接到集群管理器并申请计算资源。 - 将用户代码转化为任务: Driver 会将你的代码(RDD 的转换操作)解析成一个逻辑执行计划(DAG)。
- 生成物理执行计划: 当一个 Action 操作被触发时,DAG 调度器(DAGScheduler)会将逻辑计划(DAG)分割成一组阶段(Stages)。每个 Stage 包含了一系列可以并行执行、且没有数据混洗(Shuffle)依赖的任务(Tasks)。
- 任务调度: Driver 会将这些任务发送到集群中的 Executor 上去执行。
- 跟踪执行状态: Driver 会持续跟踪每个任务的执行情况,并在任务失败时进行重试。
- 创建
-
执行器 (Executor): 这是在集群的工作节点(Worker Node)上为某个 Spark 应用启动的一个工作进程。每个 Executor 都拥有自己独立的 JVM 和一块内存。它的核心职责是:
- 执行任务: Executor 接收来自 Driver 的任务,并在其内部的线程池中执行。一个任务处理 RDD 的一个分区(Partition)。
- 存储数据: 如果用户代码中包含
cache()
或persist()
操作,Executor 会负责将 RDD 的分区数据存储在内存或磁盘上。 - 返回结果: 将任务的计算结果返回给 Driver。
-
集群管理器 (Cluster Manager): 这是负责在集群中分配和管理资源的外部服务。Spark 本身不包含资源管理能力,它需要依赖一个集群管理器。常见的有:
- Standalone: Spark 自带的简单集群管理器,适合学习和小型私有集群。
- Apache YARN (Yet Another Resource Negotiator): Hadoop 生态系统中的标准资源管理器,也是生产环境中最常用的选择。
- Apache Mesos: 一个更通用的集群资源管理器。
- Kubernetes: 近年来日益流行的容器编排系统,也成为了部署 Spark 应用的重要选项。
一个 Spark 作业的生命周期(以 YARN 为例):
- 提交应用: 你在客户端机器上执行
spark-submit my_script.py
。 - Driver 启动:
spark-submit
会向 YARN 的资源管理器(ResourceManager)申请启动一个容器来运行你的 Driver 进程。 - 申请 Executor: Driver 内部的
SparkContext
启动后,会向 YARN 的 ResourceManager 再次发出请求,申请一定数量(由配置决定)的 Executor 容器。 - Executor 启动: YARN 的 ResourceManager 会在集群中可用的工作节点(NodeManager)上分配容器,并启动 Executor 进程。Executor 启动后会反向注册到 Driver。
- 任务分发: Driver 开始分析你的代码。当遇到一个 Action 时,它会将计算图(DAG)划分为多个 Stage,每个 Stage 包含多个 Task。然后,Driver 将这些 Task 分发给已经注册的 Executor。
- 任务执行: 每个 Executor 在其分配到的 CPU 核心上并行执行收到的 Task。每个 Task 处理输入 RDD 的一个分区,并将结果写到内存或本地磁盘。
- 结果返回/Shuffle: 如果需要 Shuffle(如
reduceByKey
),Executor 会将计算的中间结果写出,供下一个 Stage 的 Task 拉取。如果是一个collect()
这样的 Action,Executor 会将结果直接返回给 Driver。 - 应用完成: 当所有任务都成功执行完毕,
main()
函数退出,Driver 会向 YARN ResourceManager 注销自己,并释放所有容器。应用结束。
通过代码理解 SparkContext
和 SparkSession
:
在现代 PySpark (2.x 及以后) 中,我们通常使用 SparkSession
作为入口点。SparkSession
封装了 SparkContext
,并提供了 DataFrame 和 Spark SQL 功能的统一入口。
# 导入 SparkSessionfrom pyspark.sql import SparkSessiondef main(): # 1. 创建 SparkSession # .builder 是一个建造者模式的接口 # .appName() 为你的 Spark 应用设置一个名字,这个名字会显示在 Spark UI 上 # .master() 设置要连接的 Spark 集群。 # - \"local\" 表示在本地单机上运行,使用一个线程 # - \"local[4]\" 表示在本地单机上运行,使用 4 个 CPU 核心 # - \"spark://:\" 连接到一个 Standalone 集群 # - \"yarn\" 连接到一个 YARN 集群 (需要相应的配置) # .getOrCreate() 会获取一个已存在的 SparkSession,或者如果不存在,则创建一个新的 spark = SparkSession.builder \\ .appName(\"MyFirstPySparkApp\") \\ .master(\"local[2]\") \\ .getOrCreate() # 这段代码执行时,Driver 进程就启动了,并且创建了 SparkSession # SparkContext 仍然可以通过 SparkSession 访问 sc = spark.sparkContext # sc.getConf().getAll() 可以查看当前 Spark 应用的所有配置 print(f\"Spark 应用名称: { sc.appName}\") print(f\"Spark 应用 ID: { sc.applicationId}\") print(f\"Spark 用户: { sc.sparkUser()}\") print(f\"Spark 版本: { sc.version}\") # 2. 创建一个 RDD (这里使用 SparkContext) # parallelize 方法可以将一个本地的 Python 集合转换为一个分布式的 RDD # 第二个参数 2 指定了希望将这个 RDD 分成多少个分区 (Partition) data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] rdd = sc.parallelize(data, 2) # 到这里,只是定义了一个 RDD,由于惰性求值,没有任何计算实际发生 print(f\"RDD 的分区数量是: { rdd.getNumPartitions()}\") # 这个操作会触发一个作业来计算分区数 # 3. 对 RDD 执行一个转换 (Transformation) 操作 # map 是一个转换操作,它会对 RDD 中的每个元素应用一个函数 # 这里我们将每个数字乘以 2 doubled_rdd = rdd.map(lambda x: x * 2) # 同样,由于惰性求值,这一步也只是在 DAG 中增加了一个节点,没有实际计算 # 4. 执行一个动作 (Action) 操作来触发计算 # collect() 是一个动作,它会将 RDD 中的所有数据收集回 Driver 进程,并以 Python 列表的形式返回 # 这个操作会触发一个完整的 Spark 作业 result = doubled_rdd.collect() # 当 .collect() 被调用时,Driver 会分析整个 DAG (parallelize -> map), # 生成任务,并发送到 Executor 上去执行。 # Executor 执行 map(lambda x: x*2),然后将结果返回给 Driver。 print(f\"计算结果是: { result}\") # 打印最终的列表 # 5. 停止 SparkSession # 这是一个好习惯,在应用结束时显式地停止会话,以释放资源 spark.stop()if __name__ == \"__main__\": main()
这段简单的代码完整地演示了一个 PySpark 应用的微观生命周期:从创建会话、定义数据和转换,到通过一个 Action 触发真正的分布式计算,最后释放资源。理解这个流程是后续学习所有高级功能和性能调优的基础。
第二部分:理论基石 - RDD 的设计哲学与编程范式
虽然现代 PySpark 数据分析主要围绕 DataFrame API 展开,但深入理解其底层的 RDD (Resilient Distributed Dataset) 却是从“会用”到“精通”的必经之路。RDD 是 Spark 分布式计算抽象的灵魂,它的设计思想直接决定了 Spark 的高性能和容错性。DataFrame 的所有优化,最终也是被编译成高效的 RDD 操作来执行的。
第二章:RDD - Spark 的不可变分布式集合
2.1 RDD 的五大核心属性:深入其“弹性”与“分布式”的本质
RDD,弹性分布式数据集,这个名字本身就蕴含了其核心特性。它并不仅仅是一个分布在多台机器上的数据集合,其设计充满了精巧的工程智慧。一个 RDD 对象,其内部主要由五个核心属性来定义,理解这五个属性,就等于理解了 RDD 的工作原理。
-
一组分区 (A list of partitions): 这是 RDD “分布式”特性的体现。一个 RDD 在逻辑上是一个完整的数据集,但在物理上,它被切分成多个分区(Partitions)。每个分区都是数据集的一个子集,并被存储在集群中的一个节点上。一个任务(Task)处理一个分区。分区的数量决定了 Spark 作业的并行度。更多的分区意味着可以利用更多的 CPU核心来并行处理,但过多的分区也会带来额外的调度开销。
from pyspark.sql import SparkSessionspark = SparkSession.builder.appName(\"PartitionExample\").master(\"local[4]\").getOrCreate()sc = spark.sparkContext# 创建一个有 1000 个元素的 RDDmy_data = range(1000)# 方式一:在创建时指定分区数rdd1 = sc.parallelize(my_data, 5) # 将这个 RDD 分成 5 个分区print(f\"rdd1 的分区数: { rdd1.getNumPartitions()}\") # getNumPartitions() 返回 RDD 的分区数量# 方式二:从文件中读取,分区数由 Spark 决定# 假设我们有一个名为 \'my_text_file.txt\' 的文件# sc.textFile(\"my_text_file.txt\") # 默认分区数通常与 HDFS 块大小相关# 使用 glom() 来直观地看到分区的内容# glom() 是一个转换操作,它会将每个分区中的所有元素合并成一个列表# collect() 会将所有分区的结果(即多个列表)收集回 Driverpartition_contents = rdd1.glom().collect()print(\"\\nrdd1 各个分区的内容:\")for i, partition in enumerate(partition_contents): # partition 是一个列表,包含了该分区的所有数据 print(f\" 分区 { i}: 包含 { len(partition)} 个元素,前3个元素是 { partition[:3]}\") // 打印每个分区的信息,包括其包含的元素数量和前几个元素spark.stop()
-
一个作用于每个分区的计算函数 (A function to compute each partition): 这是 RDD 计算的核心。每个 RDD 都包含一个函数,这个函数描述了如何基于其父 RDD 的分区来计算出当前 RDD 的分区。例如,在一个
rdd.map(lambda x: x + 1)
操作中,这个计算函数就是lambda x: x + 1
,它将被独立地应用到 RDD 的每一个分区上。 -
一组对其他 RDD 的依赖关系 (A list of dependencies on other RDDs): 这是 RDD “弹性”和容错机制的基石。每个 RDD 都明确地知道它是从哪些父 RDD 计算而来的。这种依赖关系被记录在一个**血缘图谱(Lineage Graph)**或 DAG 中。当某个分区的数据丢失时(例如,所在的 Executor 崩溃了),Spark 可以通过这个血缘关系,从原始数据开始,重新计算出丢失的分区,而无需对整个数据集进行备份。
依赖关系分为两种:
- 窄依赖 (Narrow Dependency): 父 RDD 的每个分区最多只被子 RDD 的一个分区使用。例如
map
,filter
,union
。窄依赖的计算非常高效,可以在单个节点上以流水线(Pipelining)的方式执行,无需等待其他节点。 - 宽依赖 (Wide Dependency / Shuffle Dependency): 子 RDD 的一个分区可能依赖于父 RDD 的所有分区。例如
reduceByKey
,groupByKey
,join
。宽依赖通常意味着需要在网络间进行数据混洗(Shuffle),这是一个非常昂贵的操作,因为需要将数据在不同节点间进行传输和重组。Spark 会将宽依赖作为 Stage 划分的边界。
- 窄依赖 (Narrow Dependency): 父 RDD 的每个分区最多只被子 RDD 的一个分区使用。例如
-
一个可选的分区器 (An optional Partitioner for key-value RDDs): 对于键值对(Key-Value)类型的 RDD,比如通过
reduceByKey
生成的 RDD,可以带有一个分区器(Partitioner)。分区器的作用是告诉 Spark 如何根据键(Key)来决定一条数据应该被分配到哪个分区。最常见的是哈希分区器(HashPartitioner),它根据key.hashCode() % numPartitions
来分配分区。拥有分区器可以极大地优化后续的操作。例如,如果两个 RDD 都使用了相同的哈希分区器,那么对它们进行join
操作时,就可以避免一次 Shuffle,因为具有相同键的记录已经被预先分配到了相同的节点上。 -
一个可选的优先位置列表 (An optional list of preferred locations): 这是为了实现数据本地性(Data Locality)而设计的。对于从 HDFS 等分布式文件系统读取数据创建的 RDD,这个列表会记录每个分区数据所在的物理节点位置。当 Spark 调度任务时,它会优先将计算任务分配到数据所在的节点上执行(
PROCESS_LOCAL
级别),以避免通过网络传输数据。如果数据所在节点繁忙,它会尝试分配到同机架的其他节点(NODE_LOCAL
),最差的情况才是跨机架传输数据(RACK_LOCAL
或ANY
)。
这五大属性共同定义了一个 RDD 的全部信息,使得 Spark 能够以一种高效、容错、可恢复的方式执行分布式计算。
第二章:RDD - Spark 的不可变分布式集合
2.2 转换(Transformations)与动作(Actions):惰性求值的核心机制
Spark 的编程模型优雅而高效,其核心在于对操作的两种截然不同的分类:转换(Transformations)和动作(Actions)。理解这两者的区别,以及它们如何与“惰性求值”(Lazy Evaluation)协同工作,是掌握 PySpark 编程范式的关键。
转换(Transformations):构建计算蓝图
转换操作的本质是:从一个已有的 RDD 创建一个新的 RDD。所有的转换操作都遵循“惰性求值”的原则。当你对一个 RDD 调用一个转换方法时,Spark 并不会立即执行计算。相反,它只是在内部的 DAG(有向无环图)中记录下这个操作,将新的 RDD 作为图的一个新节点,并记录它与其父 RDD 的依赖关系。
可以把转换操作想象成是在绘制一张复杂的计算蓝图。你一步步地告诉 Spark 你想做什么——先过滤数据,再映射转换,然后分组,但 Spark 只是默默地记下你的指令,并不断完善这张蓝图,它并不会去真正地拿起工具开始施工。
常见的转换操作:
-
map(func)
: 这是最基础的转换。它将一个函数func
应用于 RDD 的每一个元素,并返回一个包含新元素的新 RDD。输入分区和输出分区的元素是一一对应的(窄依赖)。from pyspark.sql import SparkSessionspark = SparkSession.builder.appName(\"TransformationMap\").master(\"local\").getOrCreate()sc = spark.sparkContextdata_rdd = sc.parallelize([\"hello world\", \"apache spark\", \"pyspark is powerful\"])// 创建一个包含字符串的 RDD# 使用 map 转换,计算每个字符串的长度lengths_rdd = data_rdd.map(lambda s: len(s))// lambda s: len(s) 是作用于每个元素的函数// lengths_rdd 是一个新的 RDD,此时尚未进行任何实际计算# 打印 RDD 对象本身,而不是其内容print(lengths_rdd) // 输出会显示这是一个 PipelinedRDD 或 MapPartitionsRDD,表明它是一个转换的结果spark.stop()
-
filter(func)
: 过滤 RDD 中的元素。它将一个返回布尔值的函数func
应用于 RDD 的每个元素,只保留那些使函数返回True
的元素,形成一个新的 RDD(窄依赖)。from pyspark.sql import SparkSessionspark = SparkSession.builder.appName(\"TransformationFilter\").master(\"local\").getOrCreate()sc = spark.sparkContextnumbers_rdd = sc.parallelize(range(20))// 创建一个从 0 到 19 的 RDD# 使用 filter 转换,只保留偶数even_numbers_rdd = numbers_rdd.filter(lambda x: x % 2 == 0)// lambda x: x % 2 == 0 是一个判断函数,返回 True 或 False// even_numbers_rdd 是一个新的 RDD,只记录了要进行过滤的这个意图print(even_numbers_rdd)spark.stop()
-
flatMap(func)
: 与map
类似,但每个输入元素可以被映射为零个、一个或多个输出元素。func
必须返回一个序列(如列表或元组),flatMap
会将所有返回的序列“压平”成一个单一的 RDD(窄依赖)。from pyspark.sql import SparkSessionspark = SparkSession.builder.appName(\"TransformationFlatMap\").master(\"local\").getOrCreate()sc = spark.sparkContextsentences_rdd = sc.parallelize([\"hello spark\", \"learning pyspark\"])// 创建一个包含两个句子的 RDD# 使用 flatMap 将每个句子拆分成单词words_rdd = sentences_rdd.flatMap(lambda sentence: sentence.split(\" \"))// lambda sentence: sentence.split(\" \") 会对每个句子返回一个单词列表,如 [\'hello\', \'spark\']// flatMap 会将 [\'hello\', \'spark\'] 和 [\'learning\', \'pyspark\'] 这两个列表合并成一个包含四个元素的新 RDDprint(words_rdd)spark.stop()
-
distinct()
: 返回一个包含源 RDD 中所有不重复元素的新 RDD。这是一个宽依赖操作,因为它需要进行 Shuffle 来确保全局的唯一性。 -
union(otherRDD)
: 返回一个包含源 RDD 和另一个 RDD 所有元素的新 RDD(窄依赖)。 -
键值对(Pair RDD)转换: 这类转换操作只能用于由
(key, value)
元组组成的 RDD。reduceByKey(func)
: 对具有相同键的值进行聚合。func
接受两个值作为输入,返回一个值。这是一个宽依赖操作。groupByKey()
: 对具有相同键的值进行分组,返回一个新的 RDD,其中每个元素是(key, an_iterable_of_values)
。这也是一个宽依赖操作。通常应优先使用reduceByKey
,因为它在 Shuffle 之前会在每个分区本地进行一次预聚合,大大减少了网络传输的数据量。sortByKey()
: 按键对 RDD 进行排序(宽依赖)。join(otherRDD)
: 对两个键值对 RDD 进行内连接(宽依赖)。
动作(Actions):触发计算并返回值
动作操作是整个计算过程的“扳机”。当你对一个 RDD 调用一个动作方法时,Spark 才会真正地开始工作。它会审查已经构建好的 DAG,生成一个优化的物理执行计划,将任务分发到集群上,并开始执行计算。动作操作的返回值不再是一个 RDD,而是一个非 RDD 的类型,比如一个 Python 的原生数据类型(如数字、列表)或者将结果写入外部存储系统。
常见的动作操作:
-
collect()
: 最常用但也最需要小心的动作。它会将 RDD 中的所有元素都收集回 Driver 进程,并以一个 Python 列表的形式返回。这对于调试和查看小数据集的结果非常有用,但如果 RDD 非常大,调用collect()
会轻易地导致 Driver 内存溢出(OutOfMemoryError
),因为 Driver 的内存通常是有限的。from pyspark.sql import SparkSessionspark = SparkSession.builder.appName(\"ActionCollect\").master(\"local[2]\").getOrCreate()sc = spark.sparkContext# 完整的“转换+动作”链条data = range(100)rdd = sc.parallelize(data, 4)# 转换链:过滤 -> 映射processed_rdd = rdd.filter(lambda x: x > 50).map(lambda x: x * x)// 这一系列转换操作只是在构建 DAG# 调用 collect() 动作,触发计算results = processed_rdd.collect()// 1. collect() 被调用// 2. Spark 分析 DAG:parallelize -> filter -> map// 3. 将任务分发到 4 个 Executor 的核心上// 4. 每个任务处理一个分区的数据,执行过滤和映射// 5. 各个 Executor 将其计算结果发送回 Driver// 6. Driver 将所有结果合并成一个 Python 列表print(f\"大于50的数字的平方是: { results}\")spark.stop()
-
count()
: 返回 RDD 中的元素数量(一个整数)。 -
first()
: 返回 RDD 的第一个元素(与take(1)[0]
类似)。 -
take(n)
: 从 RDD 中返回前n
个元素,并以一个列表的形式返回给 Driver。它会尝试只计算尽可能少的分区来满足n
个元素的需求,比collect()
更高效。 -
reduce(func)
: 通过一个二元函数func
来并行地聚合 RDD 中的所有元素。func
必须是满足交换律和结合律的,这样才能在分区内和分区间正确地并行计算。from pyspark.sql import SparkSessionimport operator # 导入 operator 模块以使用 add 函数spark = SparkSession.builder.appName(\"ActionReduce\").master(\"local\").getOrCreate()sc = spark.sparkContextnumbers_rdd = sc.parallelize(range(1, 101), 4) # 1 到 100,分成 4 个分区# 使用 reduce 计算 1 到 100 的总和total_sum = numbers_rdd.reduce(operator.add)# 1. 每个分区内部会先用 operator.add 进行一次本地求和# 比如分区1: sum(1..25), 分区2: sum(26..50), ...# 2. 这 4 个分区的局部和会被发送回 Driver# 3. Driver 再用 operator.add 将这 4 个局部和相加,得到最终总和print(f\"1到100的总和是: { total_sum}\") # 结果应该是 5050spark.stop()
-
foreach(func)
: 对 RDD 中的每一个元素应用一个函数func
。这个动作通常用于需要对每个元素执行一个带有“副作用”(Side Effect)的操作,比如将数据写入数据库或发送到消息队列。需要注意的是,func
是在 Executor 端执行的,而不是在 Driver 端。 -
saveAsTextFile(path)
: 将 RDD 的内容保存为一个文本文件(或一组文件,每个分区一个文件)到指定的路径(如 HDFS 或本地文件系统)。
惰性求值的巨大优势:
-
自动优化: 因为 Spark 掌握了整个计算的蓝图(DAG),所以它可以在执行前进行大量的优化。例如,它可以将多个连续的
map
操作合并成一个单一的map
操作(称为流水线操作 Pipelining),减少函数调用的开销。它可以重新安排操作的顺序,将filter
操作尽可能地提前,以减少需要处理的数据量。 -
减少数据移动: Spark 会尽量推迟数据的移动(Shuffle),直到绝对必要的时候。如果没有惰性求值,每个宽依赖操作都会立即触发一次全量的数据混洗,而有了惰性求值,Spark 可以分析整个流程,看看是否能通过改变计划来避免或减少 Shuffle。
-
资源管理: 在一个 Action 被调用之前,Spark 应用实际上没有占用任何计算资源(除了 Driver 本身)。这使得资源管理器可以更有效地调度多个应用。
-
容错性: 如前所述,DAG 本身就是 RDD 容错机制的核心。
掌握了转换与动作的区别,你就可以开始用 Spark 的思维方式来思考问题:将一个复杂的数据处理任务分解成一系列的转换操作,构建起你的数据处理流水线,最后在需要最终结果的地方,用一个合适的动作操作来触发整个流程的执行。
2.3 键值对 RDD(Pair RDD)的威力与操作
虽然标准的 RDD 可以处理任何类型的 Python 对象,但在大规模数据处理中,一类特殊的 RDD——键值对 RDD(Pair RDD)——扮演着至关重要的角色。一个 Pair RDD 中的每个元素都是一个 (key, value)
形式的元组。几乎所有现实世界的数据聚合、分组和连接任务,都依赖于将数据转化为 Pair RDD 来进行操作。
Spark 为 Pair RDD 提供了一套专属的、高度优化的转换和动作操作,这些操作使得按键进行数据处理变得极其高效和便捷。
如何创建 Pair RDD:
创建 Pair RDD 最常见的方式是使用 map()
转换,将一个普通的 RDD 转换成 (key, value)
的形式。
from pyspark.sql import SparkSessionspark = SparkSession.builder.appName(\"CreatePairRDD\").master(\"local\").getOrCreate()sc = spark.sparkContextlines_rdd = sc.textFile(\"path/to/your/logfile.log\") # 假设我们有一个日志文件# 假设每行日志格式是: \"TIMESTAMP LEVEL MESSAGE\"# 例如: \"2023-10-27T10:00:00 ERROR Connection failed\"# 我们想按日志级别(LEVEL)进行统计,所以需要将日志级别作为 key# 使用 map 将每行字符串转换为 (key, value) 对# key 是日志级别,value 是 1 (用于后续计数)log_level_rdd = lines_rdd.map(lambda line: (line.split(\" \")[1], 1))# lambda line: (line.split(\" \")[1], 1) 会将 \"2023-10-27T10:00:00 ERROR Connection failed\" # 转换为 (\'ERROR\', 1)# 现在 log_level_rdd 就是一个 Pair RDD# 我们可以调用它专属的操作,比如 reduceByKeyerror_counts = log_level_rdd.reduceByKey(lambda a, b: a + b)// 这会统计每个日志级别的出现次数# 触发计算并查看结果results = error_counts.collect()print(results) # 可能会输出 [(\'ERROR\', 150), (\'INFO\', 2000), (\'WARN\', 30)]spark.stop()
核心的 Pair RDD 转换操作:
-
reduceByKey(func, [numPartitions])
: 这是 Pair RDD 最重要、最高效的聚合操作。它对具有相同键的值应用一个归约函数func
。与groupByKey
不同,reduceByKey
会在 Shuffle 数据到网络之前,在每个原始分区上进行一次本地的“预聚合”(Combiner)。这极大地减少了需要通过网络传输的数据量。reduceByKey
的工作流程图解:
假设我们有两个分区,数据如下:
Partition 1:[(\'A\', 1), (\'B\', 1), (\'A\', 1)]
Partition 2:[(\'A\', 1), (\'C\', 1), (\'B\', 1)]
- Map-Side Combine:
reduceByKey
会先在每个分区内部对相同 key 的值进行聚合。- Partition 1 聚合后变为:
[(\'A\', 2), (\'B\', 1)]
- Partition 2 聚合后变为:
[(\'A\', 1), (\'C\', 1), (\'B\', 1)]
- Partition 1 聚合后变为:
- Shuffle: 只有聚合后的结果才会被发送到网络上,进行 Shuffle。所有 ‘A’ 的中间结果被发送到一个节点,所有 ‘B’ 的到另一个节点,等等。
- 发往 ‘A’ 所在分区的数据:
[(\'A\', 2), (\'A\', 1)]
- 发往 ‘B’ 所在分区的数据:
[(\'B\', 1), (\'B\', 1)]
- 发往 ‘C’ 所在分区的数据:
[(\'C\', 1)]
- 发往 ‘A’ 所在分区的数据:
- Reduce: 在目标分区上,再次使用归约函数进行最终的聚合。
- ‘A’ 分区结果:
(\'A\', 3)
- ‘B’ 分区结果:
(\'B\', 2)
- ‘C’ 分区结果:
(\'C\', 1)
- ‘A’ 分区结果:
- Map-Side Combine:
-
groupByKey([numPartitions])
: 按键对值进行分组。它返回一个(key, iterable_of_values)
形式的 RDD。groupByKey
不会进行 Map-Side Combine。它会把所有具有相同键的值都通过网络传输到同一个节点,然后在那里将它们收集到一个迭代器中。如果某个键对应的值非常多,这可能会导致单个节点的内存溢出。因此,如果你能用reduceByKey
实现的功能,就绝对不要用groupByKey
。groupByKey
的工作流程图解 (同样的数据):
Partition 1:[(\'A\', 1), (\'B\', 1), (\'A\', 1)]
Partition 2:[(\'A\', 1), (\'C\', 1), (\'B\', 1)]
- Shuffle: 所有原始的键值对都被发送到网络上。
- 发往 ‘A’ 所在分区的数据:
[(\'A\', 1), (\'A\', 1), (\'A\', 1)]
- 发往 ‘B’ 所在分区的数据:
[(\'B\', 1), (\'B\', 1)]
- 发往 ‘C’ 所在分区的数据:
[(\'C\', 1)]
- 发往 ‘A’ 所在分区的数据:
- Group: 在目标分区上,将值收集成迭代器。
- ‘A’ 分区结果:
(\'A\', )
- ‘B’ 分区结果:
(\'B\', )
- ‘C’ 分区结果:
(\'C\', )
- ‘A’ 分区结果:
对比网络传输的数据量,
reduceByKey
明显胜出。只有当你需要对一个键的所有值进行一些不能被增量归约的操作时(比如计算中位数),才应该考虑使用groupByKey
。 - Shuffle: 所有原始的键值对都被发送到网络上。
-
aggregateByKey(zeroValue, seqFunc, combFunc, [numPartitions])
: 这是reduceByKey
和groupByKey
的“泛化”版本,也是最灵活的按键聚合函数。它允许你指定一个与输入值类型不同的输出聚合值类型。zeroValue
: 每个键的聚合“零值”或初始值。seqFunc
: 在每个分区内,用于将一个值合并到聚合器中的函数。combFunc
: 在不同分区之间,用于合并两个聚合器的函数。
示例:计算每个用户的平均购买金额
from pyspark.sql import SparkSessionspark = SparkSession.builder.appName(\"AggregateByKeyExample\").master(\"local\").getOrCreate()sc = spark.sparkContext# (user_id, purchase_amount)purchase_data = [(\'user1\', 10.0), (\'user2\', 25.0), (\'user1\', 5.0), (\'user2\', 35.0), (\'user1\', 15.0)]purchase_rdd = sc.parallelize(purchase_data)# 我们想计算 (总金额, 总次数),最后再相除得到平均值# 初始值是 (0.0, 0),即 (sum, count)zero_value = (0.0, 0)# seqFunc: 在分区内,如何将一个 purchase_amount 合并到 (sum, count) 累加器中# (acc_sum, acc_count), value -> (acc_sum + value, acc_count + 1)seq_func = (lambda acc, value: (acc[0] + value, acc[1] + 1))# combFunc: 如何合并两个分区的 (sum, count) 累加器# (acc1_sum, acc1_count), (acc2_sum, acc2_count) -> (acc1_sum + acc2_sum, acc1_count + acc2_count)comb_func = (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))# 执行 aggregateByKeysum_and_counts_rdd = purchase_rdd.aggregateByKey(zero_value, seq_func, comb_func)# 结果是 (user_id, (total_sum, total_count))# 例如:(\'user1\', (30.0, 3)), (\'user2\', (60.0, 2))# 最后,使用 mapValues 计算最终的平均值# mapValues 是一个只对 Pair RDD 的 value 部分进行操作的转换,不会引起 Shuffleaverage_rdd = sum_and_counts_rdd.mapValues(lambda sum_count: sum_count[0] / sum_count[1])results = average_rdd.collect()print(f\"每个用户的平均购买金额: { results}\") # 输出: [(\'user1\', 10.0), (\'user2\', 30.0)]spark.stop()
-
combineByKey(createCombiner, mergeValue, mergeCombiners)
: 这是aggregateByKey
的底层实现,也是最底层的按键聚合 API,提供了最大的控制度。三者的关系是:combineByKey
->aggregateByKey
->reduceByKey
。 -
join(other)
、leftOuterJoin(other)
、rightOuterJoin(other)
、fullOuterJoin(other)
: 用于连接两个 Pair RDD。这些都是宽依赖操作。如果参与连接的两个 RDD 拥有相同的、已知的 Partitioner,Spark 就可以执行一次更高效的“协同定位连接”(Co-located Join),避免对其中一个 RDD 进行 Shuffle。
掌握 Pair RDD 的操作,尤其是深刻理解 reduceByKey
, groupByKey
, 和 aggregateByKey
之间的区别与联系,是编写高性能、可扩展的 PySpark 数据聚合程序的关键所在。
2.4 持久化之道:cache()
、persist()
与 checkpoint()
的艺术
在 Spark 的惰性求值模型中,每个 RDD 都是通过其父 RDD 转换而来的。这意味着,每当一个动作(Action)被触发时,如果某个 RDD 被多次使用,Spark 默认会从头开始重新计算这个 RDD 及其所有的祖先 RDD。对于简单的计算链,这没有问题。但对于复杂的、尤其是迭代式的算法(如机器学习、图算法),反复地重算同一个中间 RDD 会带来毁灭性的性能灾难。
为了解决这个问题,Spark 提供了**持久化(Persistence)**机制,允许用户主动地告诉 Spark:“这个 RDD 很重要,我以后还要多次使用它,请你帮我把它计算一次后,将结果存储起来,以备后续重用。”
2.4.1 cache()
与 persist()
:将数据保留在计算节点
cache()
和 persist()
是最常用的持久化方法。它们的作用是将一个 RDD 的分区计算出来后,保存在 Executor 节点的内存或磁盘上。
-
cache()
: 这是最简单直接的持久化方法。它实际上是persist(StorageLevel.MEMORY_ONLY)
的一个别名。它告诉 Spark 将 RDD 的分区数据以反序列化的 Java 对象形式存储在 Executor 的 JVM 堆内存中。 -
persist(storageLevel)
: 这是一个更通用的方法,它允许你指定不同的存储级别(Storage Level),从而对持久化策略进行精细的控制。
存储级别(StorageLevel)的深度剖析:
pyspark.StorageLevel
类定义了多种存储策略,每种策略都是在速度、空间效率和可靠性之间做出的不同权衡。
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK
MEMORY_AND_DISK_SER
DISK_ONLY
OFF_HEAP
此外,每种级别还有一个复制版本(如 MEMORY_ONLY_2
, MEMORY_AND_DISK_2
),它们会将每个分区同时存储在两个不同的节点上。这提供了更高的容错性(一个节点挂掉,数据副本仍在),但代价是双倍的存储空间和网络开销。
cache()
的实战:迭代算法性能优化
让我们以一个简化的迭代算法为例,直观地感受 cache()
带来的性能提升。假设我们想通过多次迭代来“增强”一组数值。
import timefrom pyspark.sql import SparkSessionfrom pyspark.storagelevel import StorageLevelspark = SparkSession.builder.appName(\"CacheExample\").master(\"local[2]\").getOrCreate()sc = spark.sparkContext# 创建一个 RDDinitial_rdd = sc.parallelize(range(10_000_000), 8) # 一个有 1000 万个数字的 RDDdef enhance_data(rdd): \"\"\"一个模拟的、计算开销较大的转换函数\"\"\" time.sleep(0.5) # 模拟每个分区的复杂计算 return rdd.map(lambda x: x * 1.01 + 0.5)# --- 场景一:不使用 cache() ---print(\"--- 开始执行迭代计算 (不使用 cache) ---\")start_time_no_cache = time.time()# 假设这是一个迭代算法,需要多次使用同一个 RDDcurrent_rdd = initial_rddfor i in range(5): # 对 current_rdd 进行一些转换 enhanced = enhance_data(current_rdd) # 动作:count() 触发计算。每次循环,enhanced RDD 都会从 initial_rdd 开始重新计算 count_result = enhanced.count() print(f\"迭代 { i+1}: 元素数量 { count_result}\") # 在这个例子中,我们没有更新 current_rdd,只是为了演示对它的重复使用 # 在真实算法中,可能是 current_rdd = some_function(current_rdd, other_data)end_time_no_cache = time.time()print(f\"不使用 cache 的总耗时: { end_time_no_cache - start_time_no_cache:.2f} 秒\\n\")# --- 场景二:使用 cache() ---print(\"--- 开始执行迭代计算 (使用 cache) ---\")# 在第一次转换后,对需要重复使用的 RDD 进行缓存# cache() 本身也是一个转换操作,是惰性的。它只标记了 RDD 需要被缓存。cached_initial_rdd = initial_rdd.cache()# 必须有第一个动作来触发计算和缓存的填充first_count = cached_initial_rdd.count() # 这个 count() 会触发 initial_rdd 的计算,并将结果填充到 Executor 的内存中print(f\"第一次触发动作,完成缓存填充,元素数量: { first_count}\")start_time_with_cache = time.time()current_rdd = cached_initial_rddfor i in range(5): enhanced = enhance_data(current_rdd) # 动作:count()。这次,enhanced 的计算会直接从内存中读取 cached_initial_rdd 的数据, # 而不是从头开始计算。 count_result = enhanced.count() print(f\"迭代 { i+1}: 元素数量 { count_result}\")end_time_with_cache = time.time()print(f\"使用 cache 的总耗时 (不含首次填充): { end_time_with_cache - start_time_with_cache:.2f} 秒\")# 使用完毕后,可以手动释放缓存以节约内存cached_initial_rdd.unpersist()print(\"\\n缓存已释放。\")spark.stop()
在这个例子中,不使用 cache()
的版本,每次循环都会重新计算 initial_rdd
,总耗时大约是 5 次 enhance_data
的时间。而使用 cache()
的版本,只有第一次 count()
会触发完整的计算,后续的循环都直接从内存读取数据,速度会快得多。
何时以及如何使用 persist()
?
- 黄金法则: 当你准备对一个 RDD 执行超过一次动作时,就应该考虑对其进行持久化。
- 选择存储级别: 始终从
persist(StorageLevel.MEMORY_AND_DISK)
开始。这是一个安全且高效的默认选项。只有当你非常确定数据能放入内存,并且对性能要求极致时,才使用MEMORY_ONLY
。当你发现内存成为瓶颈,并且 RDD 中有大量冗余信息时,可以尝试_SER
版本。 unpersist()
: 当你确定不再需要一个持久化的 RDD 时,调用rdd.unpersist()
是一个好习惯。这会告诉 Spark 释放它占用的内存和磁盘空间,为后续的计算腾出资源。
2.4.2 checkpoint()
:为超长血缘图谱设置“存档点”
cache()
和 persist()
解决了重算问题,但它们有一个共同的特点:它们不会切断 RDD 的血缘关系(Lineage)。这意味着,即使 RDD 被缓存了,Spark 仍然保留着计算出它的完整 DAG。如果一个 Executor 节点崩溃,它上面缓存的分区就会丢失。Spark 会利用血缘关系,从父 RDD 开始重新计算丢失的分区来恢复数据。
但是,当一个 RDD 的血缘图谱变得极度长且复杂时(例如,经过了上百次转换),这种依赖血缘的容错机制也会成为一个问题:
- 恢复时间过长: 一旦有分区丢失,重算的链条会非常长,耗时很久。
- Driver 内存压力: Driver 需要在内存中维护整个 DAG。过长的 DAG 可能会导致 Driver 本身出现
StackOverflowError
。
为了解决这个问题,Spark 提供了 checkpoint()
。Checkpoint 是一种更“强硬”的持久化。它的工作机制是:
- 将 RDD 的数据完整地计算出来。
- 将计算结果保存到一个可靠的、支持容错的分布式文件系统中(通常是 HDFS)。
- 切断并丢弃该 RDD 之前的所有血缘关系图谱。这个被 checkpoint 的 RDD,现在成为了一个新的、没有父 RDD 的“根”RDD。
checkpoint()
的使用方法与最佳实践:
from pyspark.sql import SparkSessionspark = SparkSession.builder.appName(\"CheckpointExample\").master(\"local\").getOrCreate()sc = spark.sparkContext# 1. 设置 Checkpoint 目录# 必须在使用 checkpoint() 之前设置一个目录,用于存放 checkpoint 文件# 这个目录必须是 HDFS 或其他可靠的文件系统路径# 这里为了本地演示,我们使用一个本地目录import tempfilecheckpoint_dir = tempfile.mkdtemp()sc.setCheckpointDir(checkpoint_dir)print(f\"Checkpoint 目录设置为: { checkpoint_dir}\")# 创建一个 RDD,并经过一系列非常复杂的转换rdd = sc.parallelize(range(100), 4)complex_rdd = rdd.map(lambda x: (x % 10, x*x)) \\ .reduceByKey(lambda a, b: a + b) \\ .flatMap(lambda kv: [kv[1]] * kv[0]) \\ .filter(lambda x: x > 1000)# 假设这是一个计算成本很高的 RDD,我们不想在每次使用时都重算# 2. 对 RDD 调用 checkpoint()# checkpoint() 和 cache() 一样,也是一个惰性操作complex_rdd.checkpoint()# 3. 关键!必须触发一个动作来实际执行 checkpoint# 如果不调用动作,checkpoint 文件是不会被创建的!# 这是一个非常容易犯的错误。checkpoint_result = complex_rdd.count()print(f\"第一次触发动作,实际执行了 checkpoint,结果数量: { checkpoint_result}\")# 4. 验证 checkpoint 是否生效# isCheckpointed() 方法可以检查一个 RDD 是否已经被 checkpointprint(f\"complex_rdd 是否已 checkpointed: { complex_rdd.isCheckpointed()}\")# toDebugString() 可以查看 RDD 的血缘关系# 你会发现,checkpointed 之后,它的依赖关系被切断了print(f\"complex_rdd 的血缘关系:\\n{ complex_rdd.toDebugString().decode()}\")# 5. 再次使用该 RDD# 这次它会直接从 checkpoint 目录中读取数据,而不是重算another_result = complex_rdd.sum()print(f\"再次使用 RDD,计算总和: { another_result}\")# **最佳实践:cache() 与 checkpoint() 联用**# 问题:单独调用 checkpoint(),RDD 会被计算两次。# 第一次:当触发动作时,Spark 从头计算 RDD。# 第二次:Spark 发现这个 RDD 需要被 checkpoint,于是它启动一个独立的作业,再次从头计算一遍 RDD,并将结果写入 checkpoint 目录。# 解决方案:best_practice_rdd = complex_rdd.cache() # 先标记为缓存best_practice_rdd.checkpoint() # 再标记为 checkpoint# 然后触发动作best_practice_rdd.count()# 工作流程:# 1. Spark 计算 RDD,因为需要被 cache,所以将结果放入内存。# 2. Spark 发现 RDD 也需要被 checkpoint,它会直接从内存中读取缓存的数据,然后写入 checkpoint 目录。# 这样就避免了第二次重算。spark.stop()
persist()
vs. checkpoint()
的终极对比
persist()
/ cache()
checkpoint()
cache()
联用是最佳实践。在绝大多数情况下,cache()
或 persist(StorageLevel.MEMORY_AND_DISK)
是你需要的工具。只有当你明确地意识到 RDD 的血缘关系已经长到可能引发问题时,才需要动用 checkpoint()
这个“重型武器”。
第三部分:PySpark SQL 与 DataFrame - 结构化数据分析的利器
如果说 RDD 是 Spark 的“汇编语言”,提供了最底层、最灵活的分布式操作能力,那么 DataFrame API 就是 Spark 的“高级语言”(如 Python 或 Java)。它为处理结构化和半结构化数据带来了前所未有的简洁性、高性能和可优化性。对于绝大多数数据分析和 ETL 任务,DataFrame 都是比 RDD 更好的选择。本部分将深入探索 DataFrame 的世界,从其设计理念到高级应用,揭示其高性能背后的秘密。
第三章:从 RDD 到 DataFrame:一场结构化的革命
3.1 DataFrame 是什么:超越 RDD 的“模式”与“优化”
DataFrame,从概念上讲,可以被看作是一个带有“模式”(Schema)的、不可变的分布式数据集,其数据被组织成一系列命名的列(Columns)。如果你熟悉 pandas
的 DataFrame 或者关系型数据库中的表,那么 Spark 的 DataFrame 会让你感到非常亲切。它就像一张分布在成百上千台机器上的巨大表格。
但 DataFrame 绝不仅仅是“带列名的 RDD”。它的出现,是 Spark 演进过程中的一次范式转移,带来了两大革命性的优势:
-
模式(Schema)的引入:
- RDD 的局限性: RDD 是“类型无知”的。对于 Spark 来说,一个 RDD
sc.parallelize([(1, \"Alice\"), (2, \"Bob\")])
内部只是一堆无差别的 Java 对象(在 PySpark 中是序列化后的 Python 对象)。Spark 不知道第一个元素是整数id
,第二个元素是字符串name
。这意味着,当你执行rdd.map(lambda t: t[0] + 1)
这样的操作时,其合法性检查只能在运行时(Runtime)进行,并且 Spark 无法利用数据类型信息进行优化。 - DataFrame 的优势: DataFrame 强制要求数据必须有关联的模式。模式定义了每一列的名称(如 “id”, “name”)和数据类型(如
IntegerType
,StringType
)。这个模式信息为 Spark 提供了关于数据结构的丰富元数据。
- RDD 的局限性: RDD 是“类型无知”的。对于 Spark 来说,一个 RDD
-
Catalyst 优化器与 Tungsten 执行引擎:
- 查询优化: 有了模式,Spark SQL 的核心——Catalyst 优化器——就有了大展拳脚的空间。当你使用 DataFrame API(或纯 SQL 语句)构建一个查询时,Catalyst 会将你的查询逻辑转换成一个抽象语法树(AST),然后应用一系列复杂的、基于规则的优化策略(如谓词下推、列裁剪、常量折叠等),最终生成一个最优化的物理执行计划。这个过程对用户是完全透明的。
- 高效执行: 生成的物理计划最终会在 Tungsten 执行引擎上运行。Tungsten 是 Spark 的物理执行后端,它通过一系列底层技术(如直接操作二进制数据、避免 JVM 对象开销、为现代 CPU 生成优化代码等)极大地提升了执行效率。
总结来说,RDD 和 DataFrame 的核心区别在于:
- RDD API (
rdd.map
,rdd.filter
): 你告诉 Spark “如何做”(How)。你的代码直接定义了操作的执行逻辑。 - DataFrame API (
df.select
,df.where
): 你告诉 Spark “做什么”(What)。你声明性地描述你想要的结果,而由 Catalyst 优化器去决定**“如何以最优的方式去做”**。
这种从“命令式”到“声明式”的转变,将大量的性能优化工作从开发者手中解放出来,交给了 Spark 引擎本身,使得即便是没有深厚分布式系统背景的分析师和开发者,也能编写出高性能的分布式代码。
创建 DataFrame:多种数据源的统一入口
SparkSession
对象是创建 DataFrame 的统一入口。它的 read
属性提供了一个 DataFrameReader
接口,可以从多种数据源加载数据并自动推断或指定模式。
from pyspark.sql import SparkSessionfrom pyspark.sql.types import StructType, S