> 技术文档 > 大数据技术之Spark_大数据spark

大数据技术之Spark_大数据spark


1、Spark介绍

1.1、Spark是什么

Spark是什么定义:Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎

Spark最早源于一篇论文 Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing, 该论文是由加州大学柏克莱分校的 Matei Zaharia 等人发表的。论文中提出了一种弹性分布式数据集(即 RDD)的概念。

翻译过来就是:RDD 是一种分布式内存抽象,其使得程序员能够在大规模集群中做内存运算,并且有一定的容错方式。而这也是整个Spark的核心数据结构,Spark 整个平台都围绕着RDD进行。

简而言之,Spark 借鉴了 MapReduce 思想发展而来,保留了其分布式并行计算的优点并改进了其明显的缺陷。让中间数据存储在内存中提 高了运行速度、并提供丰富的操作数据的API提高了开发速度。

Spark是一款分布式内存计算的统一分析引擎。 其特点就是对任意类型的数据进行自定义计算。

Spark可以计算:结构化、半结构化、非结构化等各种类型的数据结构,同时也支持使用Python、Java、Scala、R以及SQL语言去开发应用 程序计算数据。

Spark的适用面非常广泛,所以,被称之为 统一的(适用面广)的分析引擎(数据处理)

1.2、Spark风雨十年
Spark 是加州大学伯克利分校AMP实验室(Algorithms Machines and People Lab)开发的通用大数据处理框架。 Spark的发展历史,经历过几大重要阶段,如下图所示:

Stack Overflow的数据可以看出,2015年开始Spark每月的问题提交数量已经超越Hadoop,而2018年Spark Python版本的APIPySpark每月的问题提交数量也已超过Hadoop。2019年排名Spark第一,PySpark第二;而十年的累计排名是Spark第一,PySpark第 三。按照这个趋势发展下去,Spark和PySpark在未来很长一段时间内应该还会处于垄断地位。

十年走来,Spark目前已经迭代到了3.2.0版本(2021.10.13发布),本次课程基于最新的Spark 3.2.0版本进行授课

1.3、Spark VS Hadoop(MapReduce)

Spark和前面学习的Hadoop技术栈有何区别呢?

尽管Spark相对于Hadoop而言具有较大优势,但Spark并不能完全替代Hadoop

在计算层面,Spark相比较MR(MapReduce)有巨大的性能优势,但至今仍有许多计算工具基于MR构架,比如非常成熟的Hive

Spark仅做计算,而Hadoop生态圈不仅有计算(MR)也有存储(HDFS)和资源管理调度(YARN),HDFS和YARN仍是许多大数据 体系的核心架构。

1.4、Spark四大特点

速度快:由于Apache Spark支持内存计算,并且通过DAG(有向无环图)执行引擎支持无环数据流,所以官方宣称其在内存中的运算速度要比Hadoop的MapReduce快100倍,在硬盘中要快10倍。

Spark处理数据与MapReduce处理数据相比,有如下两个不同点:

其一、Spark处理数据时,可以将中间处理结果数据存储到内存中;

其二、Spark 提供了非常丰富的算子(API), 可以做到复杂任务在一个Spark 程序中完成.

易于使用:Spark 的版本已经更新到 Spark 3.2.0(截止日期2021.10.13),支持了包括 Java、Scala、Python 、R和SQL语言在内的多种语言。为了 兼容Spark2.x企业级应用场景,Spark仍然持续更新Spark2版本。

通用性强:在 Spark 的基础上,Spark 还提供了包括Spark SQL、Spark Streaming、MLib 及GraphX在内的多个工具库,我们可以在一个应用中无缝 地使用这些工具库。

运行方式:Spark 支持多种运行方式,包括在 Hadoop 和 Mesos 上,也支持 Standalone的独立运行模式,同时也可以运行在云Kubernetes(Spark2.3开始支持)上。

对于数据源而言,Spark 支持从HDFS、HBase、Cassandra 及 Kafka 等多种途径获取数据。

1.5、Spark 框架模块-了解
整个Spark 框架模块包含:Spark Core、 Spark SQL、 Spark Streaming、 Spark GraphX、 Spark MLlib,而后四项的能力都是建立在核心引擎之上

Spark Core:Spark的核心,Spark核心功能均由Spark Core模块提供,是Spark运行的基础。Spark Core以RDD为数据抽象,提供Python、Java、 Scala、R语言的API,可以编程进行海量离线数据批处理计算。

SparkSQL:基于SparkCore之上,提供结构化数据的处理模块。SparkSQL支持以SQL语言对数据进行处理,SparkSQL本身针对离线计算场景。同 时基于SparkSQL,Spark提供了StructuredStreaming模块,可以以SparkSQL为基础,进行数据的流式计算。

SparkStreaming:以SparkCore为基础,提供数据的流式计算功能。

MLlib:以SparkCore为基础,进行机器学习计算,内置了大量的机器学习库和API算法等。方便用户以分布式计算的模式进行机器学习计算。

GraphX:以SparkCore为基础,进行图计算,提供了大量的图计算API,方便用于以分布式计算模式进行图计算。

1.6、Spark的运行模式 - 了解

Spark提供多种运行模式,包括:

本地模式(单机)本地模式就是以一个独立的进程,通过其内部的多个线程来模拟整个Spark运行时环境

Standalone模式(集群)Spark中的各个角色以独立进程的形式存在,并组成Spark集群环境

Hadoop YARN模式(集群)Spark中的各个角色运行在YARN的容器内部,并组成Spark集群环境

Kubernetes模式(容器集群)Spark中的各个角色运行在Kubernetes的容器内部,并组成Spark集群环境

云服务模式(运行在云平台上)......

1.7、Spark的架构角色 - 理解

YARN主要有4类角色,从2个层面去看

image-20230527221210760

Spark运行角色

注意正常情况下Executor是干活的角色,不过在Local模式下,Driver即管理又干活

2、Spark环境搭建-Local

2.1、课程服务器环境

2.2、基本原理

本质:启动一个JVM Process进程(一个进程里面有多个线程),执行任务Task

Local模式可以限制模拟Spark集群环境的线程数量, 即Local[N] 或 Local[*]

其中N代表可以使用N个线程,每个线程拥有一个cpu core。如果不指定N, 则默认是1个线程(该线程有1个core)。 通常Cpu有几个Core,就指定几个 线程,最大化利用计算能力.*

如果是local[*],则代表 Run Spark locally with as many worker threads as logical cores on your machine.按照Cpu最多的Cores设置线程数

Local 下的角色分布:

资源管理:

Master:Local进程本身

Worker:Local进程本身

任务执行:

Driver:Local进程本身

Executor:不存在,没有独立的Executor角色, 由Local进程(也就是Driver)内的线程提供计算能力

PS: Driver也算一种特殊的Executor, 只不过多数时候, 我们将Executor当做纯Worker对待, 这样和Driver好区分(一类是管理 一类是工人)

2.3、搭建环境
开箱即用:直接启动bin目录下的

运行成功以后,有如下提示信息:

  • sc:SparkContext实例对象:

  • spark:SparkSession实例对象

  • 4040:Web监控页面端口号

2.4、基于bin/pyspark

程序, 可以提供一个 交互式的 Python解释器环境, 在这里面可以用Python语言调用Spark API 进行计算

4040端口是一个WEBUI端口, 可以在浏览器内打开, 输入:服务器ip:4040 即可打开

打开监控页面后, 可以发现 在程序内仅有一个Driver因为我们是Local模式, Driver即管理又干活. 同时, 输入jps可以看到local模式下的唯一进程存在 这个进程 即是master也是worker

2.5、基于bin/spark-submit测试
bin/spark-submit程序, 作用: 提交指定的Spark代码到Spark环境中运行

pyspark/spark-shell/spark-submit 对比

2.6、基于spark-shell

3、Spark环境搭建-Standalone
3.1、Standalone 架构
Standalone模式是Spark自带的一种集群模式,不同于前面本地模式启动多个进程来模拟集群的环境,Standalone模 式是真实地在多个机器之间搭建Spark集群的环境,完全可以利用该模式搭建多机器集群,用于实际的大数据处理。

StandAlone 是完整的Spark运行环境,其中:

Master角色以Master进程存在, Worker角色以Worker进程存在

Driver角色运行时候有可能存在Master节点上,也有可能不存在一起,Executor运行于Worker进程内,属于Worker一个子进程 由Worker提供资源供给它们运行

StandAlone集群在进程上主要有3类进程:

主节点Master进程:

Master角色, 管理整个集群资源,并托管运行各个任务的Driver

从节点Workers:

Worker角色, 管理每个机器的资源,分配对应的资源来运行Executor(Task);每个从节点分配资源信息给Worker管理,资源信息包含内存Memory和CPU Cores核数

历史服务器HistoryServer(可选):

Spark Application运行完成以后,保存事件日志数据至HDFS,启动HistoryServer可以查看应用运行相关信息。

3.2、环境搭建
3.2.1、安装Spark

上传Spark到/opt/software下

解压到/opt/module下,

进入到/opt/module下,把文件夹重名为spark

3.2.2、安装JDK和Python

JDK前提安装完毕

上传Anaconda3-2021.05-Linux-x86_64.sh

安装Anaconda3-2021.05-Linux-x86_64.sh

之间回车就可以

输入Yes

输入安装路径

输入yes进行初始化

退出机器,重新进入

3.2.3、配置国内源

如果你安装好后, 可以打开:vim ~/.condarc这个文件, 追加如下内容:

3.2.4、创建虚拟环境

3.3、环境变量

3.3.1、配置Spark

编辑 vim /etc/profile.d/my_env.sh

编辑 vim ~/.bashrc

3.4、StandAlone部署
3.4.1、集群规划

3.4.2、在所有机器安装Anaconda3
3.4.3、编辑配置文件

进入到/opt/module/spark

  • workers

  • 配置spark-env.sh文件

在HDFS上创建程序运行历史记录存放的文件夹:

查看Master的WEB UI

默认端口master我们设置到了8080

如果端口被占用, 会顺延到8081 ...;8082... 8083... 直到申请到端口为止

可以在日志中查看, 具体顺延到哪个端口上:

Spark 应用架构从图中可以看到Spark Application运行到集群上时,由两部分组成:Driver Program和Executors。

第一、Driver Program

相当于AppMaster,整个应用管理者,负责应用中所有Job的调度执行;

运行JVM Process,运行程序的MAIN函数,必须创建SparkContext上下文对象;

一个SparkApplication仅有一个;

第二、Executors

相当于一个线程池,运行JVM Process,其中有很多线程,每个线程运行一个Task任务,一个Task任务运行需要1 Core CPU,所 有可以认为Executor中线程数就等于CPU Core核数;

一个Spark Application可以有多个,可以设置个数和资源信息;

4、Standalone HA

4.1、高可用HA

Spark Standalone集群是Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群一样,存在着Master单点故障(SPOF)的问题

如何解决这个单点故障的问题,Spark提供了两种方案:

基于文件系统的单点恢复(Single-Node Recovery with Local File System)--只能用于开发或测试环境。

基于zookeeper的Standby Masters(Standby Masters with ZooKeeper)--可以用于生产环境。ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master,但是只有一个是Active的,其他的都是Standby。当Active的Master出现故障时,另外的一个Standby Master会被选举出来。由于集群的信息,包括Worker, Driver和Application的信息都已经持久化到文件系统,因此在切换的过程中只会影响新Job的提交,对 于正在进行的Job没有任何的影响。加入ZooKeeper的集群整体架构如下图所示。

4.2、基于Zookeeper实现HA

5、Spark On YARN 环境搭建
5.1、引言
按照前面环境部署中所学习的, 如果我们想要一个稳定的生产Spark环境, 那么最优的选择就是构建:HA StandAlone集群.

不过在企业中, 服务器的资源总是紧张的, 许多企业不管做什么业务,都基本上会有Hadoop集群. 也就是会有YARN集群.

对于企业来说,在已有YARN集群的前提下在单独准备Spark StandAlone集群,对资源的利用就不高. 所以, 在企业中,多 数场景下,会将Spark运行到YARN集群中.

YARN本身是一个资源调度框架, 负责对运行在内部的计算框架进行资源调度管理. 作为典型的计算框架, Spark本身也是直接运行在YARN中, 并接受YARN的调度的.

所以, 对于Spark On YARN, 无需部署Spark集群, 只要找一台服务器, 充当Spark的客户端, 即可提交任务到YARN集群中运行.

5.2、本质

Spark On Yarn的本质?

Master角色由YARN的ResourceManager担任

Worker角色由YARN的NodeManager担任

Driver角色运行在YARN容器内 或 提交任务的客户端进程

真正干活的Executor运行在YARN提供的容器内

5.3、配置
确保:

HADOOP_CONF_DIR

YARN_CONF_DIR

在spark-env.sh 以及 环境变量配置文件中即可

5.4、测试
Spark On YARN是有两种运行模式的,

一种是Cluster模式一种是Client模式.这两种模式的区别就是Driver运行的位置.

Cluster模式即:Driver运行在YARN容器内部, 和ApplicationMaster在同一个容器内

Client模式即:Driver运行在客户端进程中, 比如Driver运行在spark-submit程序的进程中

如图, 此为Cluster模式Driver运行在容器内部

如图, 此为Client模式Driver运行在客户端程序进程中(以spark-submit为例)

两种模式的区别

5.6、Spark On Yarn两种模式总结
Client模式和Cluster模式最最本质的区别是:Driver程序运行在哪里。

Client模式:学习测试时使用,生产不推荐(要用也可以,性能略低,稳定性略低)

Driver运行在Client上,和集群的通信成本高

Driver输出结果会在客户端显示

Cluster模式:生产环境中使用该模式

Driver程序在YARN集群中,和集群的通信成本低

Driver输出结果不能在客户端显示

该模式下Driver运行ApplicattionMaster这个节点上,由Yarn管理,如果出现问题,yarn会重启 ApplicattionMaster(Driver)

在YARN Client模式下,Driver在任务提交的本地机器上运行,示意图如下

在YARN Cluster模式下,Driver运行在NodeManager Contanier中,此时Driver与AppMaster合为一体,示意图如

6、Pyspark
我们前面使用过bin/pyspark 程序, 要注意, 这个只是一个应用程序, 提供一个Python解释器执行环境来运行Spark任务 我们现在说的PySpark, 指的是Python的运行类库, 是可以在Python代码中:import pyspark

PySpark 是Spark官方提供的一个Python类库, 内置了完全的Spark API, 可以通过PySpark类库来编写Spark应用程序, 并将其提交到Spark集群中运行.下图是,PySpark类库和标准Spark框架的简单对比

6.3、连接集群环境

通过pycharm--->file--->settings

7、PySpark开发入口
Spark Application程序入口为:SparkContext,任何一个应用首先需要构建SparkContext对象,如下两步构建:

第一步、创建SparkConf对象设置Spark Application基本信息,比如应用的名称AppName和应用运行Master

第二步、基于SparkConf对象,创建SparkContext对象

文档:http://spark.apache.org/docs/3.1.2/rdd-programming-guide.html

原理分析

7.2、提交到集群运行

注意:去掉setMaster(\"local[*]\")

7.3、Python On Spark 执行原理

PySpark宗旨是在不破坏Spark已有的运行时架构,在Spark架构外层包装一层Python API,借助Py4j实现Python和Java的交互,进而实现通过Python编写Spark应用程序,其运行时架构如下图所示。

8、分布式代码执行分析

8.1、Spark 集群角色回顾(以YARN为例)

8.2、分布式代码执行分析
Spark Application应用程序运行时,无论client还是cluster部署模式DeployMode,当Driver Program和Executors启动完成以后,就要开始执行应用程序中MAIN函数的代码,以词频统计WordCount程序为例剖析讲解

第一、构建SparkContex对象和关闭SparkContext资源,都是在Driver Program中执行,上图中①和③都是,如 下图所示:

第二、上图中②的加载数据【A】、处理数据【B】和输出数据【C】代码,都在Executors上执行,从WEB UI监控 页面可以看到此Job(RDD#action触发一个Job)对应DAG图,如下所示: