irpas技术客

Spark on Maxconpute 读取资源 SQL DataFrame Json 转换开发_spark读取odps_大数据00

网络 6054

?1、Utils类 创建SparkSession 入口

package com.gwm.utils; import org.apache.spark.SparkConf; /** * @author yangyingchun * @version 1.0 * @date 2022/10/17 14:32 */ public class SparkConfUtil { static SparkConf sparkConf; public static SparkConf getloaclSparkConf(String appname,String parallelism){ sparkConf = (new SparkConf()) .setAppName(appname) .setMaster("local["+parallelism+"]"); return sparkConf; } public static SparkConf getSparkConf(String appname){ sparkConf = (new SparkConf()) .setAppName(appname) ; return sparkConf; } }

2、?config.properties 配置信息

config.properties ##ODPS相关 itsl.gwm_accessId=taUG******574jX itsl.gwm_accessKey=QTiO*********pV8mqlr9wXFT itsl.yyc_accessId=9y*********bAKn6PsIr itsl.yyc_accessKey=yPW***********jTxhemqQzPj8YFMXQ itsl_dev.gwm_odps_project=itsl_dev itsl.gwm_odps_project=itsl dwd.gwm_odps_project=dwd gwm_odps_driver=com.aliyun.odps.jdbc.OdpsDriver gwm_odps_url=jdbc:odps:http://******************************d01.odps.ops.cloud.gwm.cn/api?project=ITSL_dev&charset=UTF-8&interactiveMode=true gwm_odps_endpoint=http://******************************d01.odps.ops.cloud.gwm.cn/api

?3、Spark 编程

package com.gwm.driver import com.gwm.utils.{ConfigPropUtils, SparkSessionUtil} import org.apache.spark.sql.{Column, ColumnName, DataFrame, Dataset, Row, SaveMode, SparkSession} import scala.collection.mutable import scala.io.Source /** * @author yangyingchun * @date 2022/10/18 13:32 * @version 1.0 */ object EventOrderSuccessScala { var accessId = ConfigPropUtils.get("itsl.yyc_accessId") var accessKey = ConfigPropUtils.get("itsl.yyc_accessKey") var endPoint = ConfigPropUtils.get("gwm_odps_endpoint") var project = ConfigPropUtils.get("itsl.gwm_odps_project") val appName = EventOrderSuccessScala.getClass.getSimpleName //map 读取配置信息 //querySql:sql_info //inColumns:"","","" //outColumns:, //outputTable:${data_date} //accessId: //accessKey: //endPoint: //project: var map = new mutable.HashMap[String,String]() def main(args: Array[String]): Unit = { //通过上传文件的方式构建 val targetFile = args(0) val data_date = args(1) val outputTable = args(2) val file = Source.fromFile(targetFile) var strings: Iterator[String] = file.getLines() for (line <- strings) { println(line) val strings1 = line.split("\\^") map.put(strings1(0),strings1(1)) } file.close //1.todo.初始化编程入口 if (map.contains("accessId")) accessId = map.get("accessId").get if (map.contains("accessKey")) accessKey = map.get("accessKey").get if (map.contains("endPoint")) endPoint = map.get("endPoint").get if (map.contains("project")) project = map.get("project").get println("accessId:"+accessId) println("accessKey:"+accessKey) println("endPoint:"+endPoint) println("project:"+project) // val spark = SparkSessionUtil.getLocalParameter(appName, "*", accessId, accessKey, endPoint, project) val spark = SparkSessionUtil.getParameter(appName,accessId,accessKey,endPoint,project) //2.todo.读取数据 println(map.get("querySql").get.concat(data_date)) var sql = map.get("querySql").get.concat(data_date) var frame: DataFrame = spark.sql(sql) import spark.implicits._ frame.show() //3.todo.数据转换为JSON格式 println(map.get("inColumns").get) println(map.get("outColumns").get) var inColumns: Array[String] = map.get("inColumns").get.split(",") var outColumns: Array[String] = map.get("outColumns").get.split(",") import org.apache.spark.sql.functions.get_json_object import org.apache.spark.sql.functions.abs var result: DataFrame = frame.toDF(inColumns: _*) .toJSON .toDF(outColumns(0)) .withColumn(outColumns(1),get_json_object($"${outColumns(0)}","$."+outColumns(1))) // .toDF(outColumns:_*) result.show() //4.todo.封装并执行SQL写入目标表 // result.write.mode(SaveMode.Overwrite).partitionBy("data_date").option("encoding","utf-8").saveAsTable(outputTable) result.write.option("partitionOverwriteMode", "dynamic").mode("overwrite").insertInto(outputTable) } } 写入方式 df1.write.mode("overwrite").partitionBy("year").saveAsTable(tableName) 所有分区被覆盖 df1.write.mode("overwrite").format("Hive").partitionBy("year").saveAsTable(tableName) 所有分区被覆盖 df1.write.option("partitionOverwriteMode", "dynamic").mode("append").insertInto(tableName) 动态分区,如果分区存在则追加 df1.write.option("partitionOverwriteMode", "dynamic").mode("overwrite").insertInto(tableName) 动态分区,如果分区存在则覆盖 INSERT OVERWRITE TABLE $tableName partition (year) SELECT * FROM tmp 动态分区,如果分区存在则覆盖 INSERT INTO TABLE $tableName partition (year) SELECT * FROM tmp 动态分区,如果分区存在则追加 df1.write.mode("overwrite").insertInto(tableName) 动态分区,如果分区存在则覆盖 df1.write.mode("append").insertInto(tableName) 动态分区,如果分区存在则追加 INSERT OVERWRITE TABLE $tableName partition (year=2024) SELECT * FROM tmp 针对指定分区覆盖写入 INSERT INTO TABLE $tableName partition (year=2024) SELECT * FROM tmp 针对指定分区追加数

4、文本参数?

文本参数 querySql^select 'event_order_success' event_name,buyer_phone,order_sn,brand_name,car_model,pay_time,paid_amount,data_date from dwd.t_sl_so_oms_order_a_h where mobile is not null and add_time>='2022-03-01 20:00:00' and pay_status = 12 and pay_time IS NOT NULL and data_date= inColumns^event_name,buyer_phone,order_sn,brand_name,car_model,pay_time,paid_amount,data_date outColumns^json_message,data_date accessId^9y*********n6PsIr accessKey^yPW6***********xhemqQzPj8YFMXQ endPoint^http://*******************d01.odps.ops.cloud.gwm.cn/api project^itsl_dev

提交:

提交?参数:

1、spark.executor.instances

总共申请的executor数目普通任务十几个或者几十个足够了,若是处理大量数据时可以申请多一些,100—2000+

2、spark.executor.cores

每个executor的核数Job的最大并行度是executor数目*executor core数

3、spark.executor.memory

代表申请executor的内存

4、spark.yarn.executor.memoryOverhead

申请executor的堆外内存,默认单位是MB主要用于JVM自身,字符串, NIO Buffer等开销单个executor的总内存是:spark.executor.memory+spark.yarn.executor.memoryOverhead

5、spark.driver.cores

类似executor

6、spark.driver.memory

类似executor

7、spark.yarn.driver.memoryOverhead

类似executor

8、spark.driver.maxResultSize

默认1g,控制worker送回driver的数据大小,一旦超过该限制,driver会终止执行

9、spark.hadoop.odps.cupid.disk.driver.device_size

代表本地网盘大小,默认值为20g当出现No space left on device时可适当调大该值,最大支持100g设置该参数需要包含单位‘g’

10、spark.hadoop.odps.project.name

Spark任务运行所在的project

11、spark.hadoop.odps.access.id

提交spark任务的accessId

12、spark.hadoop.odps.access.key

提交spark任务的accessKey

13、spark.hadoop.odps.end.point

用于任务提交中国公共云一般设置为:http://service.cn.maxcompute.aliyun.com/api

14、spark.hadoop.odps.runtime.end.point

用于任务运行中国公共云一般设置为:http://service.cn.maxcompute.aliyun-inc.com/api

15、spark.hadoop.odps.task.major.version

代表当前使用的平台版本公共云设置为cupid_v2即可

16、spark.sql.catalogImplementation

Spark 2.3版本需要设置为odpsSpark 2.4以后版本会改为hive为了便于作业迁移,建议不要将该配置写在代码中

17、spark.hadoop.odps.cupid.resources

该配置项指定了程序运行所需要的Maxcompute资源,格式为<projectname>.<resourcename>,可指定多个,用逗号分隔。指定的资源将被下载到driver和executor的工作目录,经常使用该参数来引用较大的文件。资源下载到目录后默认的名字是<projectname>.<resourcename>如果需要重新命名,需要在配置时通过<projectname>.<resourcename>:<new resource name>进行重命名

18、spark.hadoop.odps.cupid.vectorization.enable

是否开启向量化读写,默认为true

19、spark.hadoop.odps.input.split.size

用于调节读Maxcompute表的并发度默认每个分区为256MB,该参数单位为MB

20、spark.hadoop.odps.cupid.vpc.domain.list

vpc访问依赖的参数,传统的访问vpc的方式

21、spark.hadoop.odps.cupid.smartnat.enable

vpc访问依赖的参数如果region是北京或者上海,将该参数设置为true

22、spark.hadoop.odps.cupid.eni.enable

如果用户已开通专线,则需要配置为true

http://spark.hadoop.odps.cupid.eni.info

如果用户已开通专线,则需要设置该参数该参数代表用户打通的vpc

23、spark.hadoop.odps.cupid.engine.running.type

普通作业3天没跑完就会被强制回收,流式作业需要设置此值为longtime

24、spark.hadoop.odps.cupid.job.capability.duration.hours

流式作业权限文件expired时间,单位小时

25、spark.hadoop.odps.moye.trackurl.dutation

流式作业jobview expired时间,单位小时

26、spark.hadoop.odps.client.spark.version

????????spark-2.4.5-odps0.33.0

27、spark.hadoop.odps.spark.version

?????????spark-2.4.5-odps0.33.0

28、spark.hadoop.odps.cupid.proxy.end.point

????????sparkui.cn-XXXXX-gwmcloud-d01.odps.ops.cloud.gwm.cn


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #spark读取odps #代码Spark #on #Maxconpute #读取资源 #SQL #DataFrame #JSON