Spark RDD的创建与常用转换
1. 从集合创建RDD
使用parallelize()
方法可以将本地集合转换为RDD:
# 从列表创建RDD
a = [1,2,3,4,5]
rdd1 = sc.parallelize(a)
# 从元组创建RDD
b = (1,2,3,4,5)
rdd2 = sc.parallelize(b)
# 指定分区数
rdd3 = sc.parallelize([1,2,3,4,5], 2)
print(rdd3.getNumPartitions()) # 输出分区数
2. 从文本文件创建RDD
Spark支持从本地文件系统或HDFS读取文件创建RDD:
# 从本地文件创建
rdd1 = sc.textFile(\"file:///home/spark/mydata/a.txt\")
# 从HDFS创建
rdd2 = sc.textFile(\"hdfs:///myfile/mydata/a.txt\")
# 读取目录下所有txt文件
rdd3 = sc.textFile(\"file:///home/spark/mydata/*.txt\")
# 读取压缩文件
rdd4 = sc.textFile(\"file:///home/spark/mydata/a.txt.gz\")
3. map操作
map()
对RDD中的每个元素应用一个函数,返回一个新的RDD:
# 基本map操作
rdd1 = sc.parallelize(range(1,5))
rdd2 = rdd1.map(lambda x: x+1)
print(rdd2.collect()) # [2, 3, 4, 5]
# 字符串处理
rdd3 = sc.parallelize([\"aa\", \"bb\", \"cc\", \"dd\"])
rdd4 = rdd3.map(lambda x: (x, 1))
print(rdd4.collect()) # [(\'aa\', 1), (\'bb\', 1), (\'cc\', 1), (\'dd\', 1)]
# 使用自定义函数
def trans(x):
return (x, 1)
rdd5 = rdd3.map(trans)
print(rdd5.collect())
4. flatMap操作
flatMap()
与map()
类似,但会将返回的迭代器\"展平\":
rdd1 = sc.parallelize([1,2,3])
rdd2 = rdd1.map(lambda x: [x, x])
rdd3 = rdd1.flatMap(lambda x: [x, x])
print(rdd2.collect()) # [[1, 1], [2, 2], [3, 3]]
print(rdd3.collect()) # [1, 1, 2, 2, 3, 3]
5. filter操作
filter()
用于筛选满足条件的元素:
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = rdd1.filter(lambda x: x % 2 == 0)
print(rdd2.collect()) # [2, 4]
6. sortBy操作
sortBy()
用于对RDD进行排序:
# 基本排序
rdd1 = sc.parallelize([1, 5, -2, 10, 8])
rdd2 = rdd1.sortBy(lambda x: x) # 升序
rdd3 = rdd1.sortBy(lambda x: x, False) # 降序
print(rdd2.collect()) # [-2, 1, 5, 8, 10]
print(rdd3.collect()) # [10, 8, 5, 1, -2]
# 复杂对象排序
stu = [(\"张婷\", \"女\", 19, \"2019级\"), (\"刘思思\", \"男\", 22, \"2018级\")]
rdd4 = sc.parallelize(stu)
rdd5 = rdd4.sortBy(lambda x: x[3]) # 按年级排序
print(rdd5.collect()) # [(\'刘思思\', \'男\', 22, \'2018级\'), (\'张婷\', \'女\', 19, \'2019级\')]