irpas技术客

Flink简介、部署模式及其区别_MelodyYN_flink 部署模式区别

未知 4840

文章目录 1、Flink简介2、Flink部署2.1 Standalone模式部署2.2 Standalone模式下的高可用2.3 Yarn模式Yarn模式的高可用配置:yarn模式中三种子模式的区别: 3、并行度4、提交命令执行指定任务5、注意事项

1、Flink简介

? Spark 和 Flink 一开始都都希望能够用同一个技术把流处理和批处理统一起来,但他们走了完全不一样的两条路。前者是以批处理的技术为根本,并尝试在批处理之上支持流计算;后者则认为流计算技术是最基本的,在流计算的基础之上支持批处理。通过Flink和Spark的对比来说:

SparkFlink流批世界观一切都是由批次组成。离线数据是一个大批次;而实时数据是由一个一个无限的小批次组成的。一切都是由流组成。离线数据是有界限的流;实时数据是一个没有界限的流。计算模型微批处理模型(秒级)连续流模型(毫秒级)驱动时间驱动型:主动拉取数据,(即使没有数据,到达一定时间,也会去计算,浪费资源)事件驱动型:被动拉取数据,(如果没数据的时候什么也不干,节省资源)
2、Flink部署 开发模式(idea)本地模式(零配置)Standalone模式Yarn模式 Session-ClusterApplication ModePer-Job-Cluster 2.1 Standalone模式部署

配置文件flink-conf.yaml

jobmanager.rpc.address: hadoop102

2.workers

hadoop102 hadoop103 hadoop104

分发至其他节点

启动集群

bin/start-cluster.sh

提交命令执行任务

bin/flink run -m hadoop102:8081 -c com.hpu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-learn-1.0-SNAPSHOT.jar

通过8088端口访问WebUI

2.2 Standalone模式下的高可用

? 任何时候都有一个主 JobManager和多个备用 JobManagers,以便在主节点失败时有备用 JobManagers 来接管集群。这可以避免单点故障,一旦备 JobManager 接管集群,作业就可以正常运行。主备 JobManager 实例之间没有明显的区别。每个 JobManager都可以充当主备节点。

修改配置文件flink-conf.yaml

high-availability: zookeeper high-availability.storageDir: hdfs://hadoop102:8020/flink/standalone/ha high-availability.zookeeper.quorum: hadoop102:2181,hadoop103:2181,hadoop104:2181 high-availability.zookeeper.path.root: /flink-standalone high-availability.cluster-id: /cluster_hpu

masters

hadoop102:8081 hadoop103:8081

分发至其他节点

修改环境变量myenv.sh,并分发source

export HADOOP_CLASSPATH=`hadoop classpath`

启动flink集群

先查看通过zookeeper客户端查看哪个是master,然后kill掉master进行测试

zkCli.sh get /flink-standalone/cluster_hpu/leader/rest_server_lock 2.3 Yarn模式

仅需配置/etc/profile.d/my.sh中配置并分发

export HADOOP_CLASSPATH=`hadoop classpath` Yarn模式的高可用配置:

Standalone模式中, 同时启动多个Jobmanager, 一个为leader其他为standby的, 当leader挂了, 其他的才会有一个成为leader。

yarn的高可用是同时只启动一个Jobmanager, 当这个Jobmanager挂了之后, yarn会再次启动一个, 其实是利用的yarn的重试次数来实现的高可用。

yarn-site.xml

<property> <name>yarn.resourcemanager.am.max-attempts</name> <value>4</value> <description> The maximum number of application master execution attempts. </description> </property>

flink-conf.yaml

yarn.application-attempts: 3 high-availability: zookeeper high-availability.storageDir: hdfs://hadoop102:8020/flink/yarn/ha high-availability.zookeeper.quorum: hadoop102:2181,hadoop103:2181,hadoop104:2181 high-availability.zookeeper.path.root: /flink-yarn

启动yarn-session

杀死Jobmanager,查看复活情况

注意: yarn-site.xml中是复活次数的上限, flink-conf.xml中的次数应该小于这个值。

测试过程中会发现一直kill不掉jobManager,是因为除了重试次数这个机制外,还有一个时间的机制(Akka超时时间),如果在一定的时间(这个时间很短)内jobManager重新拉取了几次还是挂掉的话,那就会真正的挂掉。

yarn模式中三种子模式的区别: Session模式:适合需要频繁提交的多个小job,并且执行时间都不长,因为flink会在yarn中启动一个session集群,这个集群主要用来申请资源的,后续提交的其他作业,都会直接提交到这个session集群中,不需要频繁创建flink集群,这样效率会变高,但是,作业之间相互不隔离。

per-job模式:适合规模大长时间运行的作业。每次提交job都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。

application Mode模式:每提交一个任务(application)可能会包含多个job,一个application对应一个flink集群,main方法是在集群中运行。

application Mode模式存在bug不使用。

bug:每个job的id都为0000000,而checkpoint依赖于id命名在hdfs集群上进行存储。这将导致错误发生。

3、并行度

并行度优先级:

算子指定>env全局指定>提交参数>配置文件

slot个数与并行度的关系

默认情况下,slot个数等于流程序的并行度(程序中最大算子的并行度) 在有多个共享组时,slot个数等于每个共享组中最大算子并行的和

4、提交命令执行指定任务

flink提交任务脚本参数: flink 类似于spark-submit用于提交作业 run 用来执行作业(除了applicationMode模式不需要) run-application (applicaitonMode模式执行作业的命令) -t yarn模式中指定以yarn哪种模式运行的参数 -d 后台提交(断开与客户端的连接) -m 指定JobManager以及UI端口 -D 指定其他参数。比如多队列提交参数(-Dyarn.application.queue=hive) -c 指定全类名

举例:

本地模式

bin/flink run -m hadoop102:8081 -c com.hpu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar

standalone模式

bin/flink run -m hadoop102:8081 -c com.hpu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar

yarn模式

per-job:

bin/flink run -d -t yarn-per-job -c com.hpu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar

提交任务到Yarn的其他队列

bin/flink run -d -m yarn-cluster -yqu hive -c com.hpu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar(老版本) bin/flink run -d -t yarn-per-job -Dyarn.application.queue=hive -c com.hpu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar

session-cluster:

bin/yarn-session.sh -d bin/flink run -c com.hpu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar bin/flink run -t yarn-session -Dyarn.application.id=application_XXXX_YY -c com.hpu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar

如果是1.12版本开启了Yarn模式的高可用,上面指定yarn-session集群的命令不能用,需要去掉 -t yarn-session (1.13版本已修复)

bin/flink run -Dyarn.application.id=application_XXXX_YY -c com.hpu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar

application mode:

bin/flink run-application -t yarn-application -c com.atguigu.flink.java.chapter_2.Flink03_WC_UnBoundedStream ./flink-prepare-1.0-SNAPSHOT.jar 5、注意事项

在java语法的flink编程中调用一个方法,有以下三种实现方式

自定义一个类实现接口 √写接口的匿名实现类 √写Lambda表达式

注意:在写Lambda表达式的时候,可能会因为类型擦除的原因报错,解决方式如下 在方法的最后调用.returns(Types.类型)解决 比如: SingleOutputStreamOperator<Tuple2<String, Integer>> wordToOneDStream = wordDStream.map(value -> Tuple2.of(value, 1)).returns(Types.TUPLE(Types.STRING,Types.INT));


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Flink #部署模式区别 #