> 技术文档 > Spark RDD的创建与常用转换

Spark RDD的创建与常用转换


1. 从集合创建RDD

使用parallelize()方法可以将本地集合转换为RDD:

# 从列表创建RDD
a = [1,2,3,4,5]
rdd1 = sc.parallelize(a)

# 从元组创建RDD
b = (1,2,3,4,5)
rdd2 = sc.parallelize(b)

# 指定分区
rdd3 = sc.parallelize([1,2,3,4,5], 2)
print(rdd3.getNumPartitions())  # 输出分区数

2. 从文本文件创建RDD

Spark支持从本地文件系统或HDFS读取文件创建RDD:

# 从本地文件创建
rdd1 = sc.textFile(\"file:///home/spark/mydata/a.txt\")

# 从HDFS创建
rdd2 = sc.textFile(\"hdfs:///myfile/mydata/a.txt\")

# 读取目录下所有txt文件
rdd3 = sc.textFile(\"file:///home/spark/mydata/*.txt\")

# 读取压缩文件
rdd4 = sc.textFile(\"file:///home/spark/mydata/a.txt.gz\")

3. map操作

map()对RDD中的每个元素应用一个函数,返回一个新的RDD:

# 基本map操作
rdd1 = sc.parallelize(range(1,5))
rdd2 = rdd1.map(lambda x: x+1)
print(rdd2.collect())  # [2, 3, 4, 5]

# 字符串处理
rdd3 = sc.parallelize([\"aa\", \"bb\", \"cc\", \"dd\"])
rdd4 = rdd3.map(lambda x: (x, 1))
print(rdd4.collect())  # [(\'aa\', 1), (\'bb\', 1), (\'cc\', 1), (\'dd\', 1)]

# 使用自定义函数
def trans(x):
    return (x, 1)

rdd5 = rdd3.map(trans)
print(rdd5.collect())

4. flatMap操作

flatMap()map()类似,但会将返回的迭代器\"展平\":

rdd1 = sc.parallelize([1,2,3])
rdd2 = rdd1.map(lambda x: [x, x])
rdd3 = rdd1.flatMap(lambda x: [x, x])

print(rdd2.collect())  # [[1, 1], [2, 2], [3, 3]]
print(rdd3.collect())  # [1, 1, 2, 2, 3, 3]

5. filter操作

filter()用于筛选满足条件的元素:

rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = rdd1.filter(lambda x: x % 2 == 0)
print(rdd2.collect())  # [2, 4]

6. sortBy操作

sortBy()用于对RDD进行排序:

# 基本排序
rdd1 = sc.parallelize([1, 5, -2, 10, 8])
rdd2 = rdd1.sortBy(lambda x: x)  # 升序
rdd3 = rdd1.sortBy(lambda x: x, False)  # 降序

print(rdd2.collect())  # [-2, 1, 5, 8, 10]
print(rdd3.collect())  # [10, 8, 5, 1, -2]

# 复杂对象排序
stu = [(\"张婷\", \"女\", 19, \"2019级\"), (\"刘思思\", \"男\", 22, \"2018级\")]
rdd4 = sc.parallelize(stu)
rdd5 = rdd4.sortBy(lambda x: x[3])  # 按年级排序
print(rdd5.collect())  # [(\'刘思思\', \'男\', 22, \'2018级\'), (\'张婷\', \'女\', 19, \'2019级\')]