python从入门到精通:pyspark实战分析
前言
spark:Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。简单来说,Spark是一款分布式的计算框架,用于调度成本上千的服务器集群,计算TB、PB乃至EB级别的海量数据。
同时Spark作为全球顶级的分布式计算框架,支持众多编程语言进行开发。而python语言,则是Spark重点支持的方向。
Spark对python语言的支持,重点体现在python第三方库:pyspark上。pyspark是由Spark官方开发的python语言第三方库。python开发者可以使用pip程序快速安装pyspark并像其他三方库那样直接使用。
pyspark的两种用法:
1、作为python库进行数据处理
2、提交至spark集群进行分布式集群计算
1、基础准备
pyspark库的安装:
在终端输入:pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark
构建pyspark执行环境入口对象:
想要使用pyspark库完成数据处理,首先需要构建一个执行环境入口对象。pyspark的执行环境入口对象是:类 SparkContext 的类对象
# 导包from pyspark import SparkConf,SparkContext# 创建SparkConf类对象conf = SparkConf().setMaster(\"local[*]\").setAppName(\"test_spark_app\")#基于sparkconf类对象创建sparkcontext类对象sc = SparkContext(conf=conf)# 打印pyspark的运行版本print(sc.version)# 停止sparkcontext对象的运行(停止程序)sc.stop()
在这需要注意的是,我们需要配置环境,具体可以看这篇文章:链接
pyspark的编程模型:SparkContext类对象,是pyspark编程中一切功能的入口。pyspark的编程,主要有如下三大步骤:
1、数据输入:通过SparkContext类对象的成员方法,完成数据的读取操作,读取后得到RDD类对象
2、数据处理计算:通过RDD类对象的成员方法完成各种数据计算的需求
3、数据输出:将处理完成后的RDD对象调用各种成员方法完成写出文件、转化为list等操作。
2、数据输入
RDD对象:如图可见,pyspark支持对中数据的输入,再输入完成后,都会得到一个:RDD类对象。RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)。
pyspark针对数据的处理,都是以RDD对象作为载体,即:
数据存储在RDD内
各类数据的计算方法,也都是RDD的成员方法
RDD的数据计算方法,返回值依旧是RDD对象
pyspark支持通过SparkContext对象的parallelize成员方法,将:list、tuple、dict、set、str转换为pyspark的RDD类对象。
from pyspark import SparkContext,SparkConfconf = SparkConf().setMaster(\"local[*]\").setAppName(\"test_spark_app\")sc = SparkContext(conf=conf)rdd1 = sc.parallelize([1,2,3,4,5])rdd2 = sc.parallelize((1,2,3,4,5))rdd3 = sc.parallelize(\"abcdef\")rdd4 = sc.parallelize({1,2,3,4,5})rdd5 = sc.parallelize({\"name\":\"xiaodu\",\"age\":23})print(rdd1.collect())print(rdd2.collect())print(rdd3.collect())print(rdd4.collect())print(rdd5.collect())sc.stop()
注意:字符串会被拆分为一个个的字符,存入RDD对象;字典仅有key会被存入RDD对象。
读取文件转RDD对象:
pyspark也支持SparkContext入口对象,来读取文件,构建出RDD对象。
# 读取文件数据输出from pyspark import SparkContext,SparkConfconf = SparkConf().setMaster(\"local[*]\").setAppName(\"test_spark_app\")sc = SparkContext(conf=conf)rdd = sc.textFile(\"test.txt\")print(rdd.collect())sc.stop()
3、数据计算
pyspark的数据计算,都是基于RDD对象来进行的,我们这里列举几个常用数据计算的常用的成员方法(算子)。
3.1、map方法
功能:map算子,是将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD。
rdd.map(func)
# func: f:(T) -> U
# (T) -> U 表示的是方法的定义:
# ( ) 表示传入参数,(T) 表示传入1个参数, () 表示没有传入参数
# T 是泛型的代称,在这里表示任意类型
# U 也是泛型的代称,在这里表示任意类型
# -> U 表示返回值
# (T) -> U 总结起来的意思是:这是一个方法,这个方法接受一个参数传入,传入参数类型不限。返回一个返回值,返回值类型不限。
# (A) -> A 总结起来的意思是:这是一个方法,这个方法接受一个参数传入,传入参数类型不限。返回一个返回值,返回值和传入参数类型一致。
from pyspark import SparkContext,SparkConfimport osos.environ[\'PYSPARK_PYTHON\'] = \"D:/python学习/python/python.exe\"# spark可能会找不到python解释器,所以我们需要加上上面这句话conf = SparkConf().setMaster(\"local[*]\").setAppName(\"test_spark_app\")sc = SparkContext(conf=conf)rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])# 通过map方法将全部数据都乘以10加1rdd1= rdd.map(lambda x: x * 10 +1)print(rdd1.collect())sc.stop()
需要注意的是:我们python解释器的版本不能过高,如果过高会出现:Python worker exited unexpectedly (crashed)的Bug,需要降低python解释器版本。
3.2、flatMap方法
功能:对rdd执行map操作,然后进行解除嵌套的操作
# flatMap方法from pyspark import SparkContext,SparkConfimport osos.environ[\'PYSPARK_PYTHON\'] = \"D:/python学习/python/python.exe\"conf = SparkConf().setMaster(\"local[*]\").setAppName(\"test_spark_app\")sc = SparkContext(conf=conf)rdd = sc.parallelize([\"hehe\" \"halou\" \"nihao\",\"python pyspark java\",\"C C++ C#\"])rdd1 = rdd.flatMap(lambda x: x.split())print(rdd1.collect())sc.stop()
3.3、reduceByKey方法
功能:针对KV型(二元元组)RDD,自动按照key分组,然后根据提供的聚合逻辑,完成组内数据(value)的聚合操作。
rdd.reduceByKey(func):
# func: (V,V) -> V
# 接受两个传入参数(类型要一致),返回一个返回值,类型和传入要求一致。
# reduceByKey方法from pyspark import SparkContext,SparkConfimport osos.environ[\'PYSPARK_PYTHON\'] = \"D:/python学习/python/python.exe\"conf = SparkConf().setMaster(\"local[*]\").setAppName(\"test_spark_app\")sc = SparkContext(conf=conf)rdd = sc.parallelize((\"a\",1),(\"a\",1),(\"b\",2),(\"b\",1))rdd1 = rdd.reduceByKey(lambda x,y:x+y)print(rdd1.collect())sc.stop()# 结果:[(\'b\',3),(\'a\',2)]
案例1:使用上面一系列算子,统计文件\"test.txt\"文件中单词出现数量
from pyspark import SparkContext,SparkConfimport osos.environ[\'PYSPARK_PYTHON\'] = \"D:/python学习/python/python.exe\"conf = SparkConf().setMaster(\"local[*]\").setAppName(\"test_spark_app\")sc = SparkContext(conf=conf)rdd = sc.textFile(\"test.txt\")rdd1 = rdd.flatMap(lambda line: line.split(\",\")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)print(rdd.collect())sc.stop()
3.4、filter方法
功能:过滤想要的数据进行保留
rdd.filter(func)
# func: (T) -> bool 传入一个任意类型的参数,返回值是布尔类型
返回值是True的数据被保留,False的数据被丢弃
from pyspark import SparkContext,SparkConfimport osos.environ[\'PYSPARK_PYTHON\'] = \"D:/python学习/python/python.exe\"conf = SparkConf().setMaster(\"local[*]\").setAppName(\"test_spark_app\")sc = SparkContext(conf=conf)rdd = sc.parallelize([1,2,3,4,5])# 保留偶数,丢弃奇数def func(data): if data % 2 == 0: return Truerdd1 = rdd.filter(func)rdd2 = rdd.filter(lambda x: x % 2 == 0)print(rdd1.collect())print(rdd2.collect())sc.stop()
3.5、distinct方法
功能:对RDD数据进行去重,返回新的RDD
rdd.distinct() 无需传参
from pyspark import SparkContext,SparkConfimport osos.environ[\'PYSPARK_PYTHON\'] = \"D:/python学习/python/python.exe\"conf = SparkConf().setMaster(\"local[*]\").setAppName(\"test_spark_app\")sc = SparkContext(conf=conf)rdd = sc.parallelize([1,2,2,3,3,4,5,5])rdd1 = rdd.distinct()print(rdd1.collect())sc.stop()# 结果为:[1,2,3,4,5]
3.6、sortBy方法
功能:对RDD数据进行排序,基于直盯盯地排序顺序
rdd.sortBy(func,ascending=False,numPartitions=1)
# func: (T) -> U: 告知按照rdd中的那个顺序进行排序,比如 lambda x: x[1]
# ascending True表示升序,False表示降序
# numPartitions:用多少分区排序
from pyspark import SparkContext,SparkConfimport osos.environ[\'PYSPARK_PYTHON\'] = \"D:/python学习/python/python.exe\"conf = SparkConf().setMaster(\"local[*]\").setAppName(\"test_spark_app\")sc = SparkContext(conf=conf)rdd = sc.parallelize([(\"haha\",1),(\"hehe\",3),(\"python\",4),(\"spark\",2)])rdd1 = rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1) # 按照降序进行排序print(rdd1.collect())sc.stop()
4、数据输出
4.1、输出为python对象
collect算子,功能:将RDD个各个分区内的数据,同意收集到Driver,形成一个list对象
rdd.collect()
# 返回值是一个list
from pyspark import SparkConf,SparkContextimport osos.environ[\'PYSPARK_PYTHON\'] = \"D:/python学习/python/python.exe\"conf =SparkConf().setMaster(\"local[*]\").setAppName(\"pyspark_test\")sc = SparkContext(conf=conf)rdd = sc.parallelize([1,2,3,4,5])print(rdd.collect())print(type(rdd.collect()))sc.stop()
reduce算子,功能:对RDD数据集按照传入的逻辑进行聚合(有点类似于reduceByKey,但reduce只聚合并不会对key进行分组)。
rdd.reduce(func)
# func: (T,T) -> T
# 两个参数传入一个返回值,返回值和参数要求类型一致
from pyspark import SparkConf,SparkContextimport osos.environ[\'PYSPARK_PYTHON\'] = \"D:/python学习/python/python.exe\"conf =SparkConf().setMaster(\"local[*]\").setAppName(\"pyspark_test\")sc = SparkContext(conf=conf)rdd = sc.parallelize([1,2,3,4,5])print(rdd.reduce(lambda x,y:x+y))sc.stop()
take算子,功能:取RDD的前N个元素,组合成list返回给你。
# 比如:sc.parallelize([1,2,3,4,5,6]).take(5)
# 结果为:[1,2,3,4,5]
from pyspark import SparkConf,SparkContextimport osos.environ[\'PYSPARK_PYTHON\'] = \"D:/python学习/python/python.exe\"conf =SparkConf().setMaster(\"local[*]\").setAppName(\"pyspark_test\")sc = SparkContext(conf=conf)rdd = sc.parallelize([1,2,3,4,5])take_rdd = rdd.take(3)print(rdd.collect())sc.stop()
count算子,功能:计算RDD有多少条数据,返回值是一个数字
sc.parallelize([1,2,3,4,5]).count()
# 结果为:6
from pyspark import SparkConf,SparkContextimport osos.environ[\'PYSPARK_PYTHON\'] = \"D:/python学习/python/python.exe\"conf =SparkConf().setMaster(\"local[*]\").setAppName(\"pyspark_test\")sc = SparkContext(conf=conf)rdd = sc.parallelize([1,2,3,4,5])num_rdd=rdd.count()print(f\"rdd中元素的数量为{num_rdd}\")sc.stop()
4.2、输出到文件
saveAsTextFile算子,功能:将RDD的数据写入文本文件中,支持本地写出,hdfs等文件系统。
from pyspark import SparkConf,SparkContextimport osos.environ[\'PYSPARK_PYTHON\'] = \"D:/python学习/python/python.exe\"os.environ[\'HADOOP_HOME\'] = \"D:/hadoop/hadoop.tar/hadoop-3.0.0/hadoop-3.0.0\"conf =SparkConf().setMaster(\"local[*]\").setAppName(\"pyspark_test\")sc = SparkContext(conf=conf)rdd1 = sc.parallelize([1,2,3,4,5])rdd2 = sc.parallelize([(\"hello\",3),(\"spark\",5),(\"hi\",7)])rdd3 = sc.parallelize([[1,2,3],[2,4,5],[5,7,9]])# 输出到文件rdd1.saveAsTextFile(\"D:/python学习/python_study/pythonProject/output1\")rdd2.saveAsTextFile(\"D:/python学习/python_study/pythonProject/output2\")rdd3.saveAsTextFile(\"D:/python学习/python_study/pythonProject/output3\")
这里需要安装Hadoop,并配置环境,但具体如何配置环境就不在这里细说了:
Hadoop:hhttp://archive.apache.org/dist/hadoop/common/hadoop-3.0.0/hadoop-3.0.0.tar.gzwinutils.exe:https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/winutils.exe
hadoop.dllhttps://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/hadoop.dll
修改rdd分区为1个:
方式1,SparkConf对象设置属性全局并行度为1:
conf = SparkConf().setMaster(\"local[*]\'\"),setAppName(\"test_spark\")
conf.set(\"spark.default.parallelism\",\"1\")
sc = SparkConText(conf=conf)
方式2,创建RDD对象的时候设置(parallelize方法传入numSlices的参数为1)
rdd1 = sc.parallelize([1,2,3,4,5],numSlices=1)
rdd2 = sc.parallelize([1,2,3,4,5],1)