irpas技术客

Spark 用 scala 实现读取 hive 表进行读、写等操作_ZeroXu0_scala操作hive

网络投稿 3812

?? spark 目前较为基础且常用的场景应该就是对 hive 表进行读写操作 ,尤其通过使用spark sql 实现数据分析、关联等操作 ??常用的方式是直接采用spark on hive的方式,在创建SparkSession时开启enableHiveSupport。连接上hive之后,可以利用Spark sql对hive表进行读、写、join等操作,官方也推荐Spark sql模式,因为其支持对dataframe(Dataset的特列,DataFrame=Dataset[Row] )进行操作,很多数据分析人员习惯使用python,而python没有dataset,而且sql方式对数据进行批处理方式更为直观。

1.环境准备

?? 这里就不进行赘述,简单说明下:

1)将hive-site.xml拷贝到项目代码的resources目录下2)在hive库中新建测试表;提前准好测试数据,通过load方式将文本数据加载到hive表 加载服务器数据文件sql语句,具体看下面sql 【补充】文件路径有:服务器本地文件、hdfs文件;加载方式:overwrite into(覆盖写)、into(追加写) -- 加载服务器本地文件,以覆盖写的方式 load data local inpath '/home/test_a.txt' overwrite into table zero_test_a partition(partition_name='20220518'); -- 加载hdfs文件,以追加写的方式 load data inpath '/user/hdfs/test_b.txt' into table zero_test_a partition(partition_name='20220518'); 2.实操代码 2.1 spark连接hive

连接hive时候注意开启 enableHiveSupport() 配置,另外本地测试需要加上 master(“local[*]”) 或者 master(“local[]”)

val spark = SparkSession .builder() //本地idea测试时候需要加上 .master("local[*]") .appName("spark_opts_hive") // 开启Hive支持 .enableHiveSupport() //配置metastore.uris .config("hive.metastore.uris","thrift://xxx.xxx.xxx.xxx:9083") // 配置Hivewarehouse地址 .config("hive.metastore.warehouse.dir","/user/hive/warehouse") // 如果设置了账号和密码的需要配置 .config("username","zero") .config("password","zero") .getOrCreate() 2.2 操作hive表

完成连接之后,可以通过spark.sql()方式实现读取、关联等操作,具体请看代码示例。

package com.zero.scala.sparkSql import org.apache.spark.sql.{DataFrame, SparkSession} import scala.collection.mutable.ArrayBuffer /** * sparksql 对hive表进行join * 使用sparksession */ object SparkHiveOps { var sparkSession:SparkSession = _ private def create_sparksession():SparkSession = { val hiveMetaUris = "thrift://xxx.xxx.xxx.xxx:9083" SparkSession.builder().master("local[*]") .appName("createSparkSession") .enableHiveSupport() .config("hive.metastore.warehouse.dir","/user/hive/warehouse") .config("hive.metastore.uris","hiveMetaUris") .getOrCreate() } private def init() :Unit = { sparkSession = create_sparksession() sparkSession.sql("use zeroDb") } private def destroy_sparkSession(ss:SparkSession) : Unit ={ if(ss != null) ss.close() } /** * join */ private def joinOps():DataFrame = { sparkSession.sql("select a.c1,a.c2,b.c2 from zero_test_a a left join zero_test_b b on a.c1=b.c1 where b.c2='xxx'") } def main(args: Array[String]): Unit = { init() joinOps().show(5) destroy_sparkSession(sparkSession) } }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #scala操作hive #spark #目前最常用的场景应该就是对 #hive #表进行读写操作 #尤其通过使用spark #SQL