更新于:

flink安装文档

分布式部署flink on yarn

部署standalone模式

  1. 解压
1
2
3
tar -zxvf /opt/software/flink-1.10.2-bin-scala_2.11.tgz -C /opt/module

mv /opt/module/flink-1.10.2 /opt/module/flink
  1. 修改配置文件,conf目录下
1
2
cd /opt/module/flink/conf/
vi flink-conf.yaml

flink-conf.yaml 更改下列内容

1
jobmanager.rpc.address: master

修改 masters 文件

1
vi masters

masters 更改下列内容

1
master:8081

修改 slaves 文件

1
vi slaves

slaves 更改下列内容

1
2
slave1
slave2
  1. 配置环境变量
1
vi /etc/profile

profiles 添加下列内容

1
2
export FLINK_HOME=/opt/module/flink
export PATH=$PATH:$FLINK_HOME/bin

刷新生效

1
source /etc/profile
  1. 分发文件
1
2
3
4
5
scp -r /opt/module/flink slave1:/opt/module/
scp -r /opt/module/flink slave2:/opt/module/

scp /etc/profile slave1:/etc/profile
scp /etc/profile slave2:/etc/profile

刷新其他节点的profile文件

1
2
source /etc/profile
source /etc/profile
  1. 启动集群standalone模式
1
start-cluster.sh
  1. 打开浏览器输入http://192.168.152.115:8081

image.png

部署Yarn模式

  1. YARN模式(包括session模式和per-job模式)

YARN模式是使用YARN做为Flink运行平台,JobManager、TaskManager、用户提交的应用程序都运行在YARN上。

关闭standalone模式集群

1
stop-cluster.sh
  1. hadoop环境变量引入,
1
vi /etc/profile

在用户环境变量中添加

1
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
1
2
scp /etc/profile slave1:/etc/profile
scp /etc/profile slave2:/etc/profile
1
2
source /etc/profile
source /etc/profile
  1. 修改 flink-conf.yaml 配置文件
1
2
cd /opt/module/flink/conf
vi flink-conf.yaml

关于容错配置,checkpoints存储方式

1
2
3
4
5
 state.backend: filesystem

 state.checkpoints.dir: hdfs://master:9000/flink-checkpoints

 state.savepoints.dir: hdfs://master:9000/flink-savepoints
  1. 将flink-shaded-hadoop-2-uber-2.7.5-10.0.jar上传到flink的lib目录

https://www.cnblogs.com/ryxiong-blog/p/14638422.html

1
2
cd /opt/module/flink/lib
cp /opt/software/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar . /

分发文件之前,要删除slave1,slave2上面的flink文件夹

1
2
[root@slave1 ~]cd /opt/module
[root@slave1 module]rm -r -f flink

分发文件

1
2
3
4
scp -r /opt/module/flink slave1:/opt/module
scp -r /opt/module/flink slave2:/opt/module
scp /etc/profile slave1:/etc/profile
scp /etc/profile slave2:/etc/profile

刷新变量

1
2
source /etc/profile
source /etc/profile
  1. 启动hadoop集群

为了防止YARN可能将flink任务Kill掉,可以在Hadoop配置文件yarn-site. xml中添加如下内容,3台机器都要修改

1
2
cd /opt/module/hadoop/etc/hadoop
vi yarn-site.xml

添加如下内容

1
2
3
4
5
6
7
8
<property>
  <name>yarn.nodemanager.pmem-check-enabled</name>
  <value>false</value>
</property>
<property>
  <name>yarn.nodemanager.vmem-check-enabled</name>
  <value>false</value>
</property>

(1)3台主机上启动zookeeper

1
2
zkServer.sh start
zkServer.sh status

(2)启动hdfs

1
start-dfs.sh

(3)启动yarn

1
start-yarn.sh
  1. yarn-session模式运行flink

将Flink以服务形式长期运行在YARN,Job运行完也不会关闭,但是这种方式启动时资源已经指定,多个Job运行共享资源

1
yarn-session.sh -d -s 2 -jm 1g -tm 1g

命令输出最后包含了停止命令

1
yarn application -kill application_1650849090889_0001
1
2
3
4
5
6
7
8
[root@master hadoop]jps
10640 QuorumPeerMain
16163 Jps
15269 ResourceManager
16038 YarnSessionClusterEntrypoint
14920 NameNode
15496 NodeManager
15020 DataNode

yarn-session的参数介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
  -n: 指定TaskManager的数量;
  -d:以分离模式运行;
  -id:指定yarn的任务ID;
  -j:Flink jar文件的路径;
  -jm:JobManager容器的内存(默认值:MB);
  -nl:为YARN应用程序指定YARN节点标签;
  -nm:在YARN上为应用程序设置自定义名称;
  -q:显示可用的YARN资源(内存,内核);
  -qu:指定YARN队列;
  -s:指定TaskManager中slot的数量;
  -st:以流模式启动Flink;
  -tm:每个TaskManager容器的内存(默认值:MB);
  -z:命名空间,用于为高可用性模式创建Zookeeper子路径;

启动作业

1
2
3
4
5
flink run /opt/module/flink/examples/batch/WordCount.jar

flink run /opt/module/flink/examples/batch/WordCount.jar  --input hdfs://master:9000/aaa.txt  --output hdfs://master:9000/out/result.txt

hadoop fs -ls /out
1
2
3
Found 1 items

-rw-r--r--   3 root supergroup         25 2022-04-24 14:33 /out/result.txt

查看结果

1
hadoop fs -cat /out/result.txt
1
2
3
4
5
6
flink 1
hadoop 1
hdfs 1
hello 5
spark 1
yarn 1

将yarn-session停止掉

1
2
3
yarn application -kill (applicationId)

yarn application -kill application_1650849090889_0001
  1. per-job模式运行flink

启动作业

1
flink run -m yarn-cluster -yjm 1024 -ytm 2048 -ys 2 -p 3 /opt/module/flink/examples/batch/WordCount. jar
1
flink run -m yarn-cluster -yjm 1024 -ytm 2048 -ys 2 -p 1 /opt/module/flink/examples/batch/WordCount. jar --input hdfs://master:9000/aaa. txt  --output hdfs://master:9000/out1/result. txt

查看结果

1
hadoop fs -cat /out1/result. txt
1
2
3
4
5
6
flink 1
hadoop 1
hdfs 1
hello 5
spark 1
yarn 1

flink run参数介绍:

1
2
3
4
5
6
-c:如果没有在jar包中指定入口类,则需要在这里通过这个参数指定;
-m:指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager,可以说是yarn集群名称;
-p:指定程序的并行度。可以覆盖配置文件中的默认值;
-n:允许跳过保存点状态无法恢复。 你需要允许如果您从中删除了一个运算符你的程序是的一部分保存点时的程序触发;
-q:如果存在,则禁止将日志记录输出标准出来;
-s:保存点的路径以还原作业来自(例如hdfs:///flink/savepoint-1537);

还有参数如果在yarn-session当中没有指定,可以在yarn-session参数的基础上前面加“y”,即可控制所有的资源,这里就不獒述了

  1. 关闭hadoop集群

(1)在虚拟机master执行“ stop-yarn.sh ”命令,关闭YARN。

(2)在虚拟机master执行“ stop-dfs.sh ”命令,关闭HDFS。

(3)3台机器上,关闭zookeeper

1
zkServer.sh stop

参考链接