irpas技术客

FLINK 1.12.2 流式写入HDFS(hive)的几种方式_arwenlin_flink写入hdfs

网络 2365

Flink 1.12.2 写入hdfs有3种方式,依照api出现的先后依次介绍,重点介绍Flink SQL on Hive的方式。

目录

1 streaming file sink

2 FileSink

3 Flink SQL on Hive

3.1添加依赖

3.2 配置Hive Catalog及使用Flink终端访问Hive

3.3 代码调用Flink SQL写入Hive

4 总结


1 streaming file sink

1.7版Flink开始支持通过StreamingFileSink实现写入hdfs,支持exactly-once语义,基于checkpoint实现两阶段提交(即需要设置checkpoint)。一般应用于实时数仓、topic拆分、基于小时的分析处理等。

StreamingFileSink提供了2个写入API:

forRowFormat方法,把读到的信息按照行存储的格式写入hdfs上,官网上有例子。forBulkFormat方法,指定其他的存储格式,例如:parquet,Avro,ORC等等。

Flink 提供了两个分桶策略:

BasePathBucketAssigner,不分桶,所有文件写到根目录;DateTimeBucketAssigner,基于系统时间(yyyy-MM-dd–HH)分桶。

StreamingFileSink的滚动策略有2种,滚动策略实际上就是flink何时写文件的方式:

(默认)滚动策略生成器DefaultRollingPolicy:当超过最大桶大小(默认为 128 MB),或超过了滚动周期(默认为 60秒),或未写入数据处于不活跃状态超时(默认为 60 秒)的时候,滚动文件;checkpoint滚动策略生成器OnCheckpointRollingPolicy:?当 checkpoint 的时候,滚动文件。

需要注意:

Flink的分桶意义与HDFS不同,Flink的分桶指的是将文件放在不同的文件夹中,相当于HDFS分区的概念当使用forRowFormat方法时,我们可以手动指定滚动策略(包括多久生成新文件,文件达到多大生成新文件等)当使用forBulkFormat方法时,我们只能选择OnCheckpointRollingPolicy的滚动策略 2 FileSink

1.12版Flink新增FileSink实现流批一体写入文件系统,包括写入hdfs,同样支持exactly-once语义,基于checkpoint实现两阶段提交(即需要设置checkpoint)。FileSink就是升级版的streamingFileSink,支持的滚动策略和分桶策略与StreamingFileSink一致。

3 Flink SQL on Hive

Flink从1.9开始支持集成Hive,不过1.9版本为beta版,不推荐在生产环境中使用。在Flink1.10版本中,标志着对 Blink的整合宣告完成,对 Hive 的集成也达到了生产级别的要求。值得注意的是,不同版本的Flink对于Hive的集成有所差异,本文以Flink1.12版本为例。

3.1添加依赖

使用Flink SQL需要引入Flink Table API的依赖:

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency>

集成Hive需要额外添加一些依赖jar包,这样才能通过 Table API 或 SQL Client 与 Hive 进行交互。

<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>${hive.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-metastore</artifactId> <version>${hive.version}</version> </dependency>

另外,Apache Hive 是基于 Hadoop 之上构建的, 所以还需要 Hadoop 的依赖:

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency>

Flink集群还需要配置好HADOOP_CLASSPATH(windows下IDE中调试运行也需要hadoop环境,具体可见《使用问题记录》)。这一点非常重要,否则在使用FlinkSQL Cli查询Hive中的表时,会报如下错误:

java.lang.ClassNotFoundException: org.apache.hadoop.mapred.JobConf

配置HADOOP_CLASSPATH,需要在/etc/profile文件中配置如下的环境变量:

export HADOOP_CLASSPATH=`hadoop classpath`

Flink官网提供了两种方式添加Hive的依赖项。第一种是使用 Flink 提供的 Hive Jar包(根据使用的 Metastore 的版本来选择对应的 Hive jar),建议优先使用Flink提供的Hive jar包。如果你使用的Hive版本与Flink提供的Hive jar包兼容的版本不一致,你可以选择第二种方式,即别添加每个所需的 jar 包。具体版本说明和jar包下载见官网https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/

我们使用的hive版本是3.1.2,hadoop版本是3.1.0,所以直接下载官网提供的flink-sql-connector-hive-3.1.2.jar,并引入依赖中:

<dependency> <groupId>org.apache.thrift</groupId> <artifactId>libfb303</artifactId> <version>0.9.3</version> <type>pom</type> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.11</artifactId> <version>${flink.version}</version> </dependency>

以上依赖包全都需要放到Flink集群的/lib目录下,并重启Flink集群。

为避免程序提交到Flink集群时出现jar包冲突的问题,建议依赖性设置为provided。

3.2 配置Hive Catalog及使用Flink终端访问Hive

配置sql-client-defaults.yaml,该文件是Flink SQL Cli启动时使用的配置文件,该文件位于Flink安装目录的conf/文件夹下,可以看到默认catalog为空。修改内容如下:

#============================================================================== # Catalogs #============================================================================== # Define catalogs here. catalogs: # A typical catalog definition looks like: - name: myhive type: hive hive-conf-dir: /home/hive-3.1.2/conf #Hive的conf目录 default-database: flink_onhive_test #默认使用Hive数据库

此时启动Flink SQL Client就可以访问Hive了,Flink 终端启动命令:

./bin/sql-client.sh embedded Flink SQL> show catalogs; default_catalog myhive Flink SQL> use catalog myhive; Flink SQL> show databases; default flink_onhive_test Flink SQL> use flink_onhive_test; Flink SQL> show tables; hive_table_test 3.3 代码调用Flink SQL写入Hive

设置环境变量,否则默认使用系统用户名登陆hdfs,有可能报访问权限的错

System.setProperty("HADOOP_USER_NAME","root");

创建HiveCatalog

String name = "myhive"; // Catalog名字 String defaultDatabase = "flink_onhive_test"; //默认数据库 String hiveConfDir = "/kafkatest/hiveconf"; // hive配置文件的目录.需要把hive-site.xml添加到该目录,目前只认本地文件系统 HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);

创建onhive执行环境

EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() //hive要求使用 .inStreamingMode() //提供streaming方式,可以不设置,默认是批量 .build();

使用StreamExecutionEnvironment创建StreamTableEnvironment,必须设置StreamExecutionEnvironment的checkpoint

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(10000,CheckpointingMode.EXACTLY_ONCE); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tableEnv = StreamTableEnvironment.create( env, settings ); Configuration configuration = tableEnv.getConfig().getConfiguration(); configuration.setString("table.exec.hive.fallback-mapred-reader", "true");

或者直接使用TableEnvironment的话需要直接设置tenv的checkpoint

TableEnvironment tableEnv = TableEnvironment.create(settings); configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(2));

注册HiveCatalog

把HiveCatalog: myhive作为当前session的catalog

tableEnv.registerCatalog(name, hive); tableEnv.useCatalog(name); tableEnv.useDatabase(defaultDatabase);

指定方言

tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

建表sql以hive为目的地

tableEnv.executeSql("drop table if exists t_kafkaMsg2hiveTable"); tableEnv.executeSql("CREATE TABLE IF NOT EXISTS t_kafkaMsg2hiveTable (" + "ip STRING," + "msg STRING" + ")" + " PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (" + " 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'," // hive 分区提取器提取时间戳的格式 + " 'sink.partition-commit.trigger'='partition-time'," // 分区触发提交的类型可以指定 "process-time" 和 "partition-time" 处理时间和分区时间 + " 'sink.partition-commit.delay'='0s'," // 提交延迟 + " 'sink.partition-commit.policy.kind'='metastore,success-file'" // 提交类型 + ")");

插入数据

tableEnv.executeSql("INSERT INTO t_kafkaMsg2hiveTable " + "SELECT ip,msg,DATE_FORMAT(ts3, 'yyyy-MM-dd'), DATE_FORMAT(ts3, 'HH') FROM t_KafkaMsgSourceTable");

?

4 总结 Flink SQL结合HiveCatalog利用Sql Client创建好source、sink表,程序只需要关心逻辑SQL,无需关注source、sink表的创建过程。Flink SQL无需将代码提交到集群调试,SQL调试更方便。Flink SQL提高代码复用性以及可读性、减少维护成本。Flink SQL批流一体化,更便捷实现批流join的需求。

?


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #flink写入hdfs #Flink #1122 #SQL #on #Hive的方式 #目录1