下载地址:https://flink.apache.org/downloads.html
提示:我的flink只下载在一台客户端机器上,使用hive用户运行,现存的大数据测试机器不需要做其他配置与改动。
解压配置flink ha
[root@client1 flink]# pwd
/hadoop/flink
[root@client1 flink]# tar xf flink-1.14.0-bin-scala_2.11.tgz
[root@client1 flink]# cd flink-1.14.0/
[root@client1 flink-1.14.0]# vim conf/flink-conf.yaml
high-availability: zookeeper
high-availability.zookeeper.quorum: name1.xxx.com.cn:2181,name2.xxx.com.cn:2181,node1.xxx.com.cn:2181
high-availability.zookeeper.path.root: /flink
high-availability.storageDir: hdfs://name1.xxx.com.cn:8020/flink/ha/
配置环境变量
[root@client1 flink-1.14.0]# vim /etc/profile
export FLINK_HOME=/hadoop/flink/flink-1.14.0/
export HADOOP_CLASSPATH=`hadoop classpath`
export PATH=$FLINK_HOME/bin:$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin
[root@client1 flink-1.14.0]# source /etc/profile
测试
./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar
./bin/flink run-application -t yarn-application ./examples/batch/WordCount.jar
作业提交
bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dparallelism.default=10 \
-Dyarn.application.name="MyFlinkApp" \
/path/to/my/flink-app/MyFlinkApp.jar
# List running job on the cluster,查看运行的job
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
# Cancel running job,取消运行的job
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
那么如何解决传输依赖项造成的带宽占用问题呢?Flink作业必须的依赖是发行版flink-dist.jar,还有扩展卡(位于$FLINK_HOME/lib)和插件库(位于$FLINK_HOME/plugin),我们将它们预先上传到像HDFS这样的共享存储,再通过yarn.provided.lib.dirs参数指定存储的路径即可。
-Dyarn.provided.lib.dirs="hdfs://myhdfs/flink-common-deps/lib;hdfs://myhdfs/flink-common-deps/plugins"
这样所有作业就不必各自上传依赖,可以直接从HDFS拉取,并且YARN NodeManager也会缓存这些依赖,进一步加快作业的提交过程。同理,包含Flink作业的用户JAR包也可以上传到HDFS,并指定远程路径进行提交。
./bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" hdfs://myhdfs/jars/my-application.jar
官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/security/security-kerberos/
下面使用hive用户启动flink。下面提前准备了hive用户的hive_client1.keytab文件.keytab的生成这里忽略。
[root@client1 flink-1.14.0]# su - hive
[hive@client1 ~]$ cd /hadoop/flink/flink-1.14.0/
[hive@client1 flink-1.14.0]$ ll hive_client1.keytab
-rw------- 1 hive root 738 Oct 13 10:18 hive_client1.keytab
###修改配置文件
[hive@client1 flink-1.14.0]$ vim conf/flink-conf.yaml
security.kerberos.login.keytab: /hadoop/flink/flink-1.14.0/hive_client1.keytab
security.kerberos.login.principal: hive/client1.xxx.com.cn@xxx.com.cn
注意:运行flink的用户身份对high-availability.storageDir: hdfs://name1.xxx.com.cn:8020/flink/ha/ 这个目录有rw权限
[hdfs@name1 ~]$ hadoop fs -mkdir /flink/
[hdfs@name1 ~]$ hadoop fs -chown -R hive /flink/
运行测试任务
[hive@client1 flink-1.14.0]$ ./bin/flink run-application -t yarn-application ./examples/batch/WordCount.jar
#....省略其他输出内容...
2021-10-13 10:31:59,959 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node1.xxx.com.cn:46203 of application 'application_1631779722015_68996'.