flink on yarn集群搭建及验证要点记录

standalone模式的弊端

前面搭建了flink简单集群,并在此基础上又搭建了HA集群,记录地址如下:

flink初识及集群搭建和简单验证

flink-HA集群搭建和问题记录

虽然上述两种都能使用,在学习flink-api阶段应该是够用了,但是如果真要上生产使用,就还是有一定的弊端。

根据之前的学习可知,flink集群主要分为jobManagertaskManager,而jobManger的任务主要有两个,一个是资源管理,另一个是任务调度

这样一来,jobManager的任务其实就显得有点多,而又由于mapReduce、spark、flink的集群都能基于yarn管理资源,所以有一个更好的方式就是让flink集群运行在yarn上。

yarn是hadoop项目自带的模块,启动hadoop集群会一起启动yarn,所以也不用单纯的再维护yarn,同时,yarn的资源管理也能分担jobManager的资源管理,使得这种模式下的jobManager基本只需要专注于任务调度,也能进一步提高可用性。

之所以说是基本专注,是因为它不直接管理资源,但是还是要为taskManager向yarn申请资源,而这里说的yarn管理资源,实际上就是之前hdfs搭建时里边所说的resourceManager,可以参考之前的hdfs搭建记录:

HDFS-HA模式搭建(基于完全分布式模式升级)hadoop分布式安装及配置初步解析(坑坑不息)

单纯的使flink集群运行在yarn上,不需要额外的配置,只需要使用yarn命令启动flink集群即可,但是使用方式有两种,一种是提交任务的同时启动集群,另一种是先启动集群再提交运行任务。

前一种方式每次提供运行任务时启动集群,关闭任务也会关了集群,后一种则是一直保持jobManager开启,taskManager按需启动,因此后一种相对更加常用,这种方式分为两步:

步骤一,在yarn上启动flink集群,例如:

1
yarn-session.sh -n 3 -s 3 -nm flink-yarn-test -d

上边参数的意思是:-n代表最多启动的taskManager数量,-s代表每个taskManager中最多分配的slot数量,-nm代表自定义的flink集群名称,-d代表后台运行。

步骤二,提交flink任务,这里其实又可以分为两种方式,一种是在web界面提交,一种是命令行,命令行提交示例如下:

1
flink run -c com.tzx.study.demo.flink.FlinkTest -yid application_1586794520478_0007 tzx-study-demo.jar

这里-c指定flink程序启动类的路径,-yid指定任务要提交到的flink(yarn)集群的id,末尾是jar包路径,我是在jar包所在目录执行的命令,因此就没有其他前缀。

这里需要注意的是,-yid可以不指定,默认会提交到最新启动的flink(yarn)集群中。

注:上述操作若有问题,请继续往下看。上述操作是理论操作,因为HA方式也是一样,因此我直接在HA中操作,因此有些问题会直接记录在下边。

上边的方式把flink运行在yarn上,有一个问题在于jobManager一样是单机的,也会有单点故障,因此正常的生产环境也应该是要使用HA方式。

yarn中的flink的HA,实际是增加jobManager的故障重试次数,进而使得原本运行的jobManager出现问题后,yarn能够再启动一个新的jobManager,从而提高整个flink集群的可用性,这个配置在hadoop中的yarn-site.xml文件中,增加如下配置:

1
2
3
4
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>10</value>
</property>

有了上述配置之后,一次启动zookeeper集群、hdfs集群、flink集群,但是执行yarn-session启动flink集群时却出错了,错误如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2020-10-16 09:52:05,326 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Error while running the Flink session.
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster
at org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:382) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:514) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:751) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_261]
at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_261]
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:751) [flink-dist_2.11-1.11.2.jar:1.11.2]
Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1602812938780_0001 failed 2 times in previous 10000 milliseconds (global limit =10; local limit is =2) due to AM Container for appattempt_1602812938780_0001_000003 exited with exitCode: 127
Failing this attempt.Diagnostics: [2020-10-16 09:52:04.913]Exception from container-launch.
Container id: container_1602812938780_0001_03_000001
Exit code: 127

[2020-10-16 09:52:04.920]Container exited with a non-zero exit code 127. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :

[2020-10-16 09:52:04.931]Container exited with a non-zero exit code 127. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :

For more detailed output, check the application tracking page: http://node001:18088/cluster/app/application_1602812938780_0001 Then click on links to logs of each attempt.
. Failing the application.
If log aggregation is enabled on your cluster, use this command to further investigate the issue:
yarn logs -applicationId application_1602812938780_0001
at org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1021) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:524) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:375) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
... 7 more

上述提示内容没有看出实质性的问题,但是却发现了如下两行:

1
2
If log aggregation is enabled on your cluster, use this command to further investigate the issue:
yarn logs -applicationId application_1602812938780_0001

不用完全看懂也能大概知道要进一步查问题,需要执行yarn logs -applicationId application_1602812938780_0001命令,但是直接执行会发现也看不到什么内容,反而有如下错误提示:

1
2
3
4
5
2020-10-16 10:07:48,478 INFO client.RMProxy: Connecting to ResourceManager at node001/192.168.139.91:18040
File /tmp/logs/root/logs-tfile/application_1602812938780_0001 does not exist.

Can not find any log file matching the pattern: [ALL] for the application: application_1602813970991_0001
Can not find the logs for the application: application_1602813970991_0001 with the appOwner: root

这个问题其实上边有说明了,使用上述命令的前提是集群中开启聚合日志,因此需要在yarn-site.xml中再增加一个配置开启这个日志:

1
2
3
4
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

重启hdfs集群后,再执行yarn-session来启动flink集群,还是会出现启动失败的提示,但是再次执行yarn logs就有了更多的日志信息,其中有一段如下:

1
2
3
4
5
6
7
8
9
10
Container: container_1602813970991_0001_02_000001 on node002_40809
LogAggregationType: AGGREGATED
==================================================================
LogType:jobmanager.err
LogLastModifiedTime:Fri Oct 16 09:52:05 +0800 2020
LogLength:48
LogContents:
/bin/bash: /bin/java: No such file or directory

End of LogType:jobmanager.err

有了这段提示,原因就显而易见了,这是很多地方的常见问题,找不到JAVA_HOME

但是为啥找不到呢,联想一下之前的hdfs集群搭建,想起来hdfs集群启动的时候,会从一台机ssh到另一台机启动shell进而启动该节点。

而这个shell不会加载/etc/profile文件,也就导致会找不到JAVA_HOME

根据之前在hadoop-env.sh中增加JAVA_HOME配置的经验,猜想应该是yarn需要一样的处理,于是在yarn-env.sh中加入了JAVA_HOME的配置:

1
export JAVA_HOME=/opt/java/jdk1.8.0_261

需要注意,以上所有配置的修改,均需要同步分发到所有节点。

web ui

之前在standalone模式中,可以直接访问8081端口查看jobManagerweb ui,在这里界面能看到很多信息,也能直接提交flink的任务,而flink运行在yarn上,一样有web ui,只不过有两个。

其中一个可以通过yarnweb ui跳转,另一个则是直接访问,但是这两个有所区别。

通过yarn跳转的,默认只能看一些信息,不能提交任务,而直接访问的,和原来8081访问的效果一样。

上边配置好JAVA_HOME后,再重启hadoop和yarn下的flink集群,会看到不再报错,并且最后带出了jobManager的ui地址:

1
2
2020-10-16 10:33:15,488 INFO  org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager [] - State change: CONNECTED
JobManager Web Interface: http://node003:37917

上边这个http://node003:37917就是我现在可直接访问的,等同之前8081访问的jobManagerweb ui

而使用yarn跳转方式,需要先访问yarn的web ui,默认端口是8088,而我搭建的时候配置的是18088

其他要点

standalone模式的HA集群中,有两个重要文件,mastersworkers,这两个文件决定了jobManagertaskManager节点。

而在yarn中,资源是yarn管理,这两个文件实际是无效的,我试过清空这两个文件的内容,对运行没有任何影响。

但是和standalone模式一样的是,在系统环境变量中依然需要有下边的配置存在:

1
2
3
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export 

HADOOP_CLASSPATH=`hadoop classpath`

推荐文章