> 技术文档 > 【亲测有效】Flink1.20分布式集群搭建-最新版本_flink 1.20

【亲测有效】Flink1.20分布式集群搭建-最新版本_flink 1.20


一、Flink部署模式

由于在一些企业应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求,所以Flink为各种场景提供了不同的部署模式,主要包含以下三种模式。

会话模式(Session Mode):如图所示,在会话模式中,会先启动一个Flink集群保持一个会话,然后通过客户端提交Flink作业。由于Flink集群启动时所有资源都已经确定,所以通过客户端提交的所有作业会竞争Flink集群的资源。该模式比较适合单个作业规模较小,而执行时间短的大量作业的应用场景。
【亲测有效】Flink1.20分布式集群搭建-最新版本_flink 1.20

单作业模式(Per-Job Mode):由于会话模式的资源共享特性会导致很多问题,所以为了更好地隔离资源,可以考虑为提交的每个Flink作业启动一个集群,这种模式就是单作业模式,如图所示。在单作业模式中,作业运行完成之后,Flink集群就会关闭,所有资源也会被释放。这种特性使得单作业模式在生产环境中运行更加稳定,是实际应用中的首选模式,但是该模式仅仅支持YARN运行模式,并且在Flink1.15及以后的版本中被废弃掉了。
【亲测有效】Flink1.20分布式集群搭建-最新版本_flink 1.20

应用模式(Application Mode):由于会话模式和单作业模式的Flink应用代码都是在客户端执行,然后由客户端提交给JobManager,所以会造成客户端需要占用大量的网络带宽,特别是提交作业使用的是同一个客户端。如图所示,应用模式的解决办法是,直接将Flink应用提交到JobManager上运行。这就意味着我们需要为每一个提交的Flink应用单独启动一个JobManager,也就是创建一个集群。这个JobManager只为执行这一个Flink应用而存在,应用执行结束之后JobManager也会随之关闭,这就是应用模式。
【亲测有效】Flink1.20分布式集群搭建-最新版本_flink 1.20

应用模式与单作业模式的相同之处:都会专门为一个Flink作业运行一个集群。不同之处:在单作业模式中,Flink作业的main方法在客户端运行,而在应用模式中,Flink作业的main方法在JobManager上执行。

二、Flink分布式集群规划

这里需要提前准备hadoop01、hadoop02、hadoop03三个节点来搭建Flink集群,hadoop01节点部署JobManager和TaskManager,hadoop02和hadoop03只部署TaskManager。
【亲测有效】Flink1.20分布式集群搭建-最新版本_flink 1.20

三、 Flink Standalone运行模式

1.Standalone运行模式概述
Standalone运行模式是Flink最基本的运行模式,它不需要依赖任何外部框架就可以独立工作。在这种运行模式下,所有的Flink组件(如JobManager、TaskManager等)都在集群中运行,形成一个独立的Flink集群。在三种部署模式中,Standalone运行模式支持会话模式和应用模式部署,不支持单作业模式部署。
【亲测有效】Flink1.20分布式集群搭建-最新版本_flink 1.20
Standalone会话模式下作业提交流程如上图所示。
(1)客户端将作业提交给JobManager,Dispatcher负责作业的接收和调度。
(2)Dispathcer为作业启动一个JobMaster,并将工作交给JobManager(更准确地说是交给了JobMaster)。
(3)JobMaster是作业的主控节点,负责作业的调度和管理。JobMaster接收到作业信息之后,会进行详情的解析和规划,然后向ResourceManager请求相应的资源。
(4)ResourceManager负责集群资源的分配和管理。当JobMaster请求资源时,ResourceManager会根据集群的当前状态和作业的资源需求来分配资源,并将分配的资源信息返回给Job Master。
(5)ResourceManager分配的资源其实来自于TaskManager,TaskManager来提供空闲的资源(Slot)用于Task的执行。
(6)当获取到足够的资源后,JobMaster会开始启动作业中的各个Task,将Task分发到已经分配好的TaskManager节点上,并监控Task的执行情况。

2.配置Standalone运行模式的集群
(1)下载并解压Flink
下载flink-1.20.0-bin-scala_2.12.tgz安装包(地址:https://flink.apache.org/downloads/),选择hadoop01作为安装节点,然后上传至hadoop01节点的 /home/hadoop/app目录下进行解压安装,操作命令如下。

[hadoop@hadoop01 app]$ lsflink-1.20.0-bin-scala_2.12.tgz#解压[hadoop@hadoop01 app]$ tar -zxvf flink-1.20.0-bin-scala_2.12.tgz#创建软连接[hadoop@hadoop01 app]$ ln -s flink-1.20.0 flink

(2)修改config.yaml配置文件
在hadoop01节点,进入Flink根目录下的conf文件夹中,修改config.yaml配置文件,核心内容如下所示。

[hadoop@hadoop01 conf]$ vi config.yaml#jobmanager配置jobmanager: bind-host: 0.0.0.0 rpc: address: hadoop01#taskmanager配置taskmanager: bind-host: 0.0.0.0 host: hadoop01#Rest配置rest: address: hadoop01 bind-address: 0.0.0.0

(3)修改masters配置文件
在hadoop01节点,进入Flink根目录下的conf文件夹中,修改masters配置文件,将hadoop01节点指定为JobManager角色,具体内容如下所示。

[hadoop@hadoop01 conf]$ vi mastershadoop01:8081

(4)修改workers配置文件
在hadoop01节点,进入Flink根目录下的conf文件夹中,修改workers配置文件,将hadoop01、hadoop02和hadoop03节点指定为JobManager角色,具体内容如下所示。

[hadoop@hadoop01 conf]$ vi workershadoop01hadoop02hadoop03

(5)分发安装目录
在hadoop01节点中,使用Linux远程拷贝命令scp将修改好的Flink配置文件,分别同步到hadoop02和hadoop03节点,具体操作如下所示。

#分发Flink配置文件[hadoop@hadoop01 app]$ scp -r flink-1.20.0 hadoop@hadoop02:/home/hadoop/app/[hadoop@hadoop01 app]$ scp -r flink-1.20.0 hadoop@hadoop03:/home/hadoop/app/#创建软连接[hadoop@hadoop02 app]$ ln -s flink-1.20.0 flink[hadoop@hadoop03 app]$ ln -s flink-1.20.0 flink

Flink配置文件同步完成之后,需要分别到hadoop02和hadoop03节点上修改config.yaml配置文件,具体修改内容如下所示。

[hadoop@hadoop02 conf]$ vi config.yamltaskmanager: host: hadoop02[hadoop@hadoop03 conf]$ vi config.yamltaskmanager: host: hadoop03

3.会话模式部署
在会话模式中,需要提前启动Flink集群,然后在客户端提交作业。这里以WordCount作业为例,具体操作步骤如下所示。
(1)启动集群
在hadoop01节点上,进入Flink的bin目录,启动Flink Standalone集群,具体操作如下所示。

[hadoop@hadoop01 flink]$ cd bin/[hadoop@hadoop01 bin]$ ./start-cluster.shStarting cluster.Starting standalonesession daemon on host hadoop01.Starting taskexecutor daemon on host hadoop01.Starting taskexecutor daemon on host hadoop02.Starting taskexecutor daemon on host hadoop03.

(2)启动NetCat服务
在hadoop01节点上,启动一个监听在9999端口上的NetCat服务端,等待Flink WordCount应用的连接并接收数据,具体操作如下所示。

[root@hadoop01 ~]# nc -lk 9999

(3)提交Flink作业
将IDEA打好的learningFlink1.20-1.0-SNAPSHOT.jar包上传至hadoop01节点,然后通过Flink脚本提交WordCount作业,具体操作如下所示。

[hadoop@hadoop01 flink]$ bin/flink run -m hadoop01:8081 -c com.yangjun.WordCount /home/hadoop/shell/jar/learningFlink1.20-1.0-SNAPSHOT.jarJob has been submitted with JobID b6a1b3473c28b169735ebb24ab050547

(4)输入测试数据
在hadoop01节点上,通过NetCat服务端输入测试数据集,具体操作如下所示。

[root@hadoop01 ~]# nc -lk 9999flinkflinkflink

(5)查看运行结果
由于hadoop01、hadoop02和hadoop03节点都是TaskManager角色,所以提交的WordCount作业可能运行在其中任何一个节点中。如果想查看WordCount作业的输出结果,需要逐个查看每个节点的日志文件输出结果。根据查看情况,定位到作业运行在hadoop01节点,此时可进入Flink的log目录查看输出结果,具体操作如下所示。

[hadoop@hadoop01 flink]$ cd log/[hadoop@hadoop01 log]$ tail flink-hadoop-taskexecutor-0-hadoop01.out (flink,1)(flink,2)(flink,3)

4.应用模式部署
在应用模式中,不会提前启动Flink集群,所以不能使用start-cluster.sh脚本启动集群服务。这里需要使用bin目录下的standalone-job.sh脚本单独启动JobManager服务,具体操作步骤如下所示。
(1)启动NetCat服务
在hadoop01节点上,启动一个监听在9999端口上的NetCat服务端,等待Flink WordCount应用的连接并接收数据,具体操作如下所示。

[root@hadoop01 ~]# nc -lk 9999

(2)准备应用程序包
将IDEA打好的项目包上传至hadoop01节点的/home/hadoop/app/flink/lib目录下,查看Flink lib目录下的项目包操作如下所示。

[hadoop@hadoop01 lib]$ pwd/home/hadoop/app/flink/lib[hadoop@hadoop01 lib]$ lslearningFlink1.20-1.0-SNAPSHOT.jar

(3)启动JobManager服务
在hadoop01节点上,进入Flink的bin目录,启动JobManager服务并指定WordCount的作业入口,具体操作如下所示。

[hadoop@hadoop01 bin]$ ./standalone-job.sh start --job--classname com.yangjun.WordCountStarting standalonejob daemon on host hadoop01.

备注:这里通过–job–classname参数直接指定WordCount入口类,脚本会到lib目录中扫描所有的jar包,从而找到WordCount应用程序包。

(4)启动TaskManager服务
分别在hadoop01、hadoop02和hadoop03节点,进入Flink的bin目录启动TaskManager服务,具体操作如下所示。

[hadoop@hadoop01 bin]$ ./taskmanager.sh startStarting taskexecutor daemon on host hadoop01.[hadoop@hadoop02 bin]$ ./taskmanager.sh startStarting taskexecutor daemon on host hadoop02.[hadoop@hadoop03 bin]$ ./taskmanager.sh startStarting taskexecutor daemon on host hadoop03.

(5)输入测试数据
在hadoop01节点上,通过NetCat服务端输入测试数据集,具体操作如下所示。

[root@hadoop01 ~]# nc -lk 9999flinkflinkflink

(6)查看运行结果
通过查看定位到作业运行在hadoop01节点,此时可进入Flink的log目录查看输出结果,具体操作如下所示。

[hadoop@hadoop01 flink]$ cd log/[hadoop@hadoop01 log]$ tail flink-hadoop-taskexecutor-0-hadoop01.out (flink,1)(flink,2)(flink,3)

四、 Flink YARN运行模式

1.YARN运行模式概述
Flink YARN运行模式是Flink利用Hadoop YARN进行资源管理和调度的部署方式。在YARN上,Flink作业动态申请资源,由YARN的ApplicationMaster启动JobManager和TaskManager。它支持会话模式和应用模式:前者允许在会话中提交多个作业,后者为每个作业启动单独的YARN应用。相比Standalone模式,YARN模式提供更高的资源利用率和弹性,支持资源隔离和优先级调度,使Flink作业能更好地共享大型分布式集群资源。
【亲测有效】Flink1.20分布式集群搭建-最新版本_flink 1.20
YARN会话模式下作业提交流程如上图所示。
(1)启动集群
如果没有集群,需要创建一个新的Session模式的集群。首先,将应用的配置文件上传至HDFS,然后通过客户端向YARN提交Flink创建集群的申请,YARN分配资源,在申请的YARN容器中初始化并启动Flink JobManager,在JobManager进程中运行YarnSessionClusterEntrypoint作为集群启动的入口,初始化Dispatcher、ResourceManager。启动相关的RPC服务,等待客户端通过Rest接口提交作业。

(2)提交作业
YARN集群准备好之后,开始提交作业。
1)Flink客户端通过Rest向Dispatcher提交作业。
2)Dispatcher是Rest接口,不负责实际的调度、执行方面的工作,当收到作业之后,为作业创建一个JobMaster,将工作交给JobMaster(负责作业调度、管理作业和Task的生命周期)。
3)JobMaster向Flink ResourceManager申请资源,开始调度作业的执行;初次提交作业,集群尚没有TaskManager,此时资源不足,开始申请资源。
4)Flink ResourceManager收到JobMaster的资源请求,如果当前有空闲的Slot,则将Slot分配给JobMaster,否则Flink ResourceManager将向YARN ResourceManager请求创建TaskManager。
5)Flink ResourceManager将资源请求加入等待请求队列,并通过心跳向YARN ResourceManager申请新的Container资源来启动TaskManager进程;YARN分配新的Container给TaskManager。
6)Flink ResourceManager从HDFS加载Jar文件等所需的相关资源,在容器中启动TaskManager。
7)TaskManager启动之后,向Flink ResourceManager进行注册,并把自己的Slot资源情况汇报给Flink ResourceManager。
8)Flink ResourceManager从等待队列中取出Slot请求,向TaskManager确认资源可用情况,并告知TaskManager将Slot分配给了哪个JobMaster。
9)TaskManager向JobMaster提供Slot,JobMaster将Task调度到TaskManager的Slot上执行。

2.配置YARN运行模式的集群
Flink On YARN运行模式的运行依赖与Hadoop集群(需要提前搭建好Hadoop集群),不需要配置Flink集群,只需要配置一个客户端提交Flink作业即可。
(1)配置环境变量
由于节点资源有限,这里我们选择hadoop01节点作为Flink客户端。hadoop01节点需要增加Hadoop环境变量,才能将Flink作业提交到YARN集群,具体操作如下所示。

[hadoop@hadoop01 ~]$ vi ~/.bashrcJAVA_HOME=/home/hadoop/app/jdkHADOOP_HOME=/home/hadoop/app/hadoopCLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jarPATH=$JAVA_HOME/bin:$HADOOP_HOME/bin:$PATHexport JAVA_HOME CLASSPATH PATH HADOOP_HOMEexport HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoopexport HADOOP_CLASSPATH=`hadoop classpath`#使配置生效[hadoop@hadoop01 flink]$ source ~/.bashrc

(2)启动Zookeeper集群
分别在hadoop01、hadoop02和hadoop03节点进入Zookeeper安装目录,启动Zookeeper服务,具体操作如下所示。

[hadoop@hadoop01 zookeeper]$ bin/zkServer.sh start[hadoop@hadoop02 zookeeper]$ bin/zkServer.sh start[hadoop@hadoop03 zookeeper]$ bin/zkServer.sh start

(3)启动Hadoop集群
在hadoop01节点,进入Hadoop安装目录分别启动HDFS和YARN集群,具体操作如下所示。

[hadoop@hadoop01 hadoop]$ sbin/start-dfs.sh[hadoop@hadoop01 hadoop]$ sbin/start-yarn.sh

3.会话模式部署
YANR的会话模式与Standalone集群略有不同,首先需要申请一个YARN会话即YARN Session,来启动Flink集群。
(1)启动NetCat服务
在hadoop01节点上,启动一个监听在9999端口上的NetCat服务端,等待Flink WordCount应用的连接并接收数据,具体操作如下所示。

[root@hadoop01 ~]# nc -lk 9999

(2)启动YARN会话
在hadoop01节点,进入Flink安装目录,启动YARN会话,具体操作如下所示。

[hadoop@hadoop01 flink]$ bin/yarn-session.sh -nm wordcountFound Web Interface hadoop03:45753 of application \'application_1734486011928_0001\'.JobManager Web Interface: http://hadoop03:45753

核心参数说明。
-d:分离模式,可以使YARN Session在后台运行。
-jm(–jobManagerMemory):配置JobManager所需内存,默认单位MB。
-nm(–name):配置在YARN UI界面上显示的任务名。
-qu(–queue):指定YARN队列名。
-tm(–taskManager):配置每个TaskManager所使用内存。
如上打印信息所示。YARN Session启动之后,会生成一个YARN application ID和一个Web UI地址。

(3)提交作业
Flink可以通过Web UI界面或者命令行来提交作业,由于Web UI界面操作较为简单,我们重点讲解通过命令行来提交Flink作业。
在hadoop01节点,新打开一个控制台,进入Flink安装目录,提交WordCount作业,具体操作如下所示。

[hadoop@hadoop01 flink]$ bin/flink run -c com.yangjun.WordCount /home/hadoop/shell/jar/learningFlink1.20-1.0-SNAPSHOT.jar

Flink客户端可以自行确定JobManager的地址,也可以通过-m或者-jobmanager参数指定JobManager的地址,JobManager的地址在YARN Session的启动页面中可以找到。

(4)输入测试数据
在hadoop01节点上,通过NetCat服务端输入测试数据集,具体操作如下所示。

[root@hadoop01 ~]# nc -lk 9999flinkflinkflink

(5)查看运行结果
通过查看定位到作业运行在hadoop02节点,此时可进入Hadoop安装路径下的logs/userlogs目录查看输出结果,具体操作如下所示。

[hadoop@hadoop02 container_e42_1734486011928_0001_01_000002]$ tail taskmanager.out (flink,1)(flink,2)(flink,3)[hadoop@hadoop02 container_e42_1734486011928_0001_01_000002]$ pwd/home/hadoop/app/hadoop/logs/userlogs/application_1734486011928_0001/container_e42_1734486011928_0001_01_000002

4.应用模式部署
在Flink On YARN运行模式中,由于有了外部平台做资源调度,所以我们也可以直接向YARN提交一个单独的Flink作业,从而启动一个Flink集群。
(1)启动NetCat服务
在hadoop01节点上,启动一个监听在9999端口上的NetCat服务端,等待Flink WordCount应用的连接并接收数据,具体操作如下所示。

[root@hadoop01 ~]# nc -lk 9999

(2)提交作业
在hadoop01节点,进入Flink安装目录,直接提交WordCount作业,具体操作如下所示。

[hadoop@hadoop01 flink]$ bin/flink run-application -t yarn-application -c com.yangjun.WordCount /home/hadoop/shell/jar/learningFlink1.20-1.0-SNAPSHOT.jar......YARN application has been deployed successfully.Found Web Interface hadoop02:35108 of application \'application_1734486011928_0002\'.

(3)输入测试数据
在hadoop01节点上,通过NetCat服务端输入测试数据集,具体操作如下所示。

[root@hadoop01 ~]# nc -lk 9999flinkflinkflink

(4)查看运行结果
通过查看定位到作业运行在hadoop02节点,此时可进入Hadoop安装路径下的logs/userlogs目录查看输出结果,具体操作如下所示。

[hadoop@hadoop02 container_e42_1734486011928_0002_01_000002]$ tail taskmanager.out (flink,1)(flink,2)(flink,3)[hadoop@hadoop02 container_e42_1734486011928_0002_01_000002]$ pwd/home/hadoop/app/hadoop/logs/userlogs/application_1734486011928_0002/container_e42_1734486011928_0002_01_000002

到这里为止,我们已经成功完成了Flink On Standalone集群和Flink On YANR集群的搭建,同时分别进行了会话模式和应用模式的作业部署。