irpas技术客

Spark SQL参数调优指南_longlovefilm_spark sql 调优

网络投稿 8231

目录

1 运行行为

1.1 动态生成分区

1.2 broadcast join

使用hint强制做broadcastjoin:

1.3 动态资源分配

1.4 Shuflle相关

1.5 读ORC表优化

2 executor能力

2.1内存

2.2 executor并发度

2.3 executor读取hive表时单task处理数据量/无shuffle作业小文件合并

2.4 GC优化(使用较少,当尝试其他调优方法均无效时可尝试此方法)

3 driver指标:

3.1 内存

3.2 GC优化

一、常用参数

-- map端小文件合并 set hive.merge.mapfiles=true; -- reduce端小文件合并 set hive.merge.mapredfiles=true; -- 小文件合并的阈值 set spark.sql.mergeSmallFileSize=128000000; -- 小文件合并的task中,每个task读取的数据量 set spark.sql.targetBytesInPartitionWhenMerge=128000000; -- 普通task读取的数据量,原来的值是33554432 (33M) set spark.hadoopRDD.targetBytesInPartition=256000000; -- 启动参数 set spark.sql.rangePartition.exchangeCoordinator=true; -- 控制shuffle阶段每个task读取的数据量为256M --spark2只对最后一个stage进行shuffle分区合并,spark3对中间的stage也生效,在spark2的时候有些同学会依赖spark.sql.adaptive.shuffle.targetPostShuffleInputSize进行小文件合并,在spark3上如果设置的这个参数,影响了中间stage可能会使作业的运行时间变长,我们对小文件的问题有专门的feature,可以设置下面两个参数 set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=256000000;

二、正文

本文目标读者为对美团点评数据仓库和Spark SQL运行原理有基本了解的数据开发人员。

目前,Spark在美团点评的数仓生产中主要执行Hive表到Hive表的生产任务,尤其对有多轮shuffle的作业有很好的性能提升。美团点评用于ETL生产的Spark集群架设在HDFS、Yarn之上,依靠Hive metastore server进行元数据管理。

Spark完成一个数据生产任务(执行一条SQL)的基本过程如下:

(1)对SQL进行语法分析,生成逻辑执行计划——(2)从Hive metastore server获取表信息,结合逻辑执行计划生成并优化物理执行计划——(3)根据物理执行计划向Yarn申请资源(executor),调度task到executor执行。——(4)从HDFS读取数据,任务执行,任务执行结束后将数据写回HDFS。

本文从作业运行行为、executor处理能力、driver能力三个方面进行介绍。

作业运行行为主要影响(3)阶段中,task被如何调度,有shuffle时Reducer的个数等。

executor处理能力主要影响(3)阶段中task被调度到executor后,executor能否正常完成任务。

driver能力主要影响(2)(3)阶段。在(2)阶段中,如果用户表为ORC表,driver可能读取file footer等信息,会导致driver读取HDFS,如果这部分信息太大,则可能会造成driver存在内存压力。(3)阶段中,driver与Yarn RM交互,申请到executor后进行调度,task执行结束会生成一定执行指标信息和任务执行元数据信息返回给driver,同时每个executor还会和driver有心跳连接,这些都是driver运行的负载。

1 运行行为 1.1 动态生成分区

下列Hive参数对Spark同样起作用。

set hive.exec.dynamic.partition=true;?//?是否允许动态生成分区

set hive.exec.dynamic.partition.mode=nonstrict;?// 是否容忍指定分区全部动态生成

set hive.exec.max.dynamic.partitions = 100;?// 动态生成的最多分区数

1.2 broadcast join

当大表JOIN小表时,如果小表足够小,可以将大表分片,分别用小表和每个大表的分片进行JOIN,最后汇总,能够大大提升作业性能。

spark.sql.autoBroadcastJoinThreshold 平台默认值为26214400(25M),如果小表的大小小于该值,则会将小表广播到所有executor中,使JOIN快速完成。如果该值设置太大,则会导致executor内存压力过大,容易出现OOM。

注:ORC格式的表会对数据进行压缩,通常压缩比为2到3左右,但有些表的压缩比就会很高,有时可以达到10。请妥善配置该参数,并配合spark.executor.memory,使作业能够顺利执行。

使用hint强制做broadcastjoin:

有时候,可能会遇到引擎无法识别表大小的情况,可以使用hint强制执行broadcast join,如下所示。Spark可以识别/*+ MAPJOIN(l) */和/*+ BROADCASTJOIN(u) */两种,Hive只能识别/*+ MAPJOIN(l) */,因此建议使用/*+ MAPJOIN(l) */。

代码块

SQL

select /*+ MAPJOIN(l) */ i.a, i.b, l.b from tmp1 i join tmp2 l ON i.a = l.a; 1.3 动态资源分配

spark.dynamicAllocation.enabled:是否开启动态资源分配,平台默认开启,同时强烈建议用户不要关闭。理由:开启动态资源分配后,Spark可以根据当前作业的负载动态申请和释放资源。

spark.dynamicAllocation.maxExecutors: 开启动态资源分配后,同一时刻,最多可申请的executor个数。平台默认设置为1000。当在Spark UI中观察到task较多时,可适当调大此参数,保证task能够并发执行完成,缩短作业执行时间。

下图是一个由于并发不足导致作业执行较慢的一个明显的任务:

打开执行时间较长的stage,查看其任务数为2w+。

点击stage的链接,进入查看stage中的任务,将任务按照Launch Time排序,先有小到大再由大到小。

可以看到任务启动时间差了3个多小时。可以确定该任务是由于spark.dynamicAllocation.maxExecutors过小导致的。

该参数可以和spark.executor.cores配合增大作业并发度。s

spark.dynamicAllocation.minExecutors: 和s,d,maxExecutors相反,此参数限定了某一时刻executor的最小个数。平台默认设置为3,即在任何时刻,作业都会保持至少有3个及以上的executor存活,保证任务可以迅速调度。

1.4 Shuflle相关

spark.sql.shuffle.partitions: 在有JOIN或聚合等需要shuffle的操作时,从mapper端写出的partition个数,平台默认设置为2000。

如select a, avg(c) from test_table group by a语句,不考虑优化行为,如果一个map端的task中包含有3000个a,根据spark.sql.shuffle.partitions=2000,会将计算结果分成2000份partition(例如按2000取余),写到磁盘,启动2000个reducer,每个reducer从每个mapper端拉取对应索引的partition。

当作业数据较多时,适当调大该值,当作业数据较少时,适当调小以节省资源。

spark.sql.adaptive.enabled:是否开启调整partition功能,如果开启,spark.sql.shuffle.partitions设置的partition可能会被合并到一个reducer里运行。平台默认开启,同时强烈建议开启。理由:更好利用单个executor的性能,还能缓解小文件问题。

spark.sql.adaptive.shuffle.targetPostShuffleInputSize:和spark.sql.adaptive.enabled配合使用,当开启调整partition功能后,当mapper端两个partition的数据合并后数据量小于targetPostShuffleInputSize时,Spark会将两个partition进行合并到一个reducer端进行处理。平台默认为67108864(64M),用户可根据自身作业的情况酌情调整该值。当调大该值时,一个reduce端task处理的数据量变大,最终产出的数据,存到HDFS上的文件也变大。当调小该值时,相反。

代码块

Plain Text

18/01/16 03:18:03 WARN TransportChannelHandler: Exception in connection from rz-data-hdp-dn3938.rz.sankuai.com/10.16.47.49:7337 io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 2013265920, max: 2022178816) at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:530) at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:484) at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:711) at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:700) at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:237) at io.netty.buffer.PoolArena.allocate(PoolArena.java:221) at io.netty.buffer.PoolArena.allocate(PoolArena.java:141) at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:296) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168) at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129) at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) at java.lang.Thread.run(Thread.java:745)

以及

代码块

Plain Text

18/01/16 18:36:03 WARN TransportChannelHandler: Exception in connection from rz-data-hdp-dn0871.rz.sankuai.com/10.16.57.13:34925 java.lang.IllegalArgumentException: Too large frame: 3608642420 at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119) at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) at java.lang.Thread.run(Thread.java:745)

spark.sql.adaptive.minNumPostShufflePartitions: 当spark.sql.adaptive.enabled参数开启后,有时会导致很多分区被合并,为了防止分区过少,可以设置spark.sql.adaptive.minNumPostShufflePartitions参数,防止分区过少而影响性能。

1.5 读ORC表优化

spark.hadoop.hive.exec.orc.split.strategy参数控制在读取ORC表时生成split的策略。BI策略以文件为粒度进行split划分;ETL策略会将文件进行切分,多个stripe组成一个split;HYBRID策略为:当文件的平均大小大于hadoop最大split值(默认256 * 1024 * 1024)时使用ETL策略,否则使用BI策略。

对于一些较大的ORC表,可能其footer较大,ETL策略可能会导致其从hdfs拉取大量的数据来切分split,甚至会导致driver端OOM,因此这类表的读取建议使用BI策略。

对于一些较小的尤其有数据倾斜的表(这里的数据倾斜指大量stripe存储于少数文件中),建议使用ETL策略。

另外,spark.hadoop.mapreduce.input.fileinputformat.split.maxsize参数可以控制在ORC切分时stripe的合并处理。具体逻辑是,当几个stripe的大小大于spark.hadoop.mapreduce.input.fileinputformat.split.maxsize时,会合并到一个task中处理。可以适当调小该值,如set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize=134217728。以此增大读ORC表的并发。

比如我想要以64M大小划分split,可以在XT ETL中设置如下参数:

代码块

SQL

set spark.hadoop.hive.exec.orc.split.strategy=ETL; set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize=64000000; set spark.hadoop.mapreduce.input.fileinputformat.split.minsize=64000000; -- 避免将拆分的split再合并 set spark.hadoopRDD.targetBytesInPartition=-1; 2 executor能力 2.1内存

spark.executor.memory:executor用于缓存数据、代码执行的堆内存以及JVM运行时需要的内存。当executor端由于OOM时,多数是由于spark.executor.memory设置较小引起的。该参数一般可以根据表中单个文件的大小进行估计,但是如果是压缩表如ORC,则需要对文件大小乘以2~3倍,这是由于文件解压后所占空间要增长2~3倍。平台默认设置为2G。

spark.yarn.executor.memoryOverhead:Spark运行还需要一些堆外内存,直接向系统申请,如数据传输时的netty等。

Spark根据spark.executor.memory+spark.yarn.executor.memoryOverhead的值向RM申请一个容器,当executor运行时使用的内存超过这个限制时,会被yarn kill掉。在Spark UI中相应失败的task的错误信息为:

代码块

Plain Text

Container killed by YARN for exceeding memory limits. XXX of YYY physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

这个时候,适当调大spark.yarn.executor.memoryOverhead。平台默认设置为1024(1G),注意:该参数的单位为MB。但是,如果用户在代码中无限制的使用堆外内存。调大该参数没有意义。需要用户了解自己的代码在executor中的行为,合理使用堆内堆外内存。

spark.sql.windowExec.buffer.spill.threshold:当用户的SQL中包含窗口函数时,并不会把一个窗口中的所有数据全部读进内存,而是维护一个缓存池,当池中的数据条数大于该参数表示的阈值时,spark将数据写到磁盘。该参数如果设置的过小,会导致spark频繁写磁盘,如果设置过大则一个窗口中的数据全都留在内存,有OOM的风险。但是,为了实现快速读入磁盘的数据,spark在每次读磁盘数据时,会保存一个1M的缓存。

举例:当spark.sql.windowExec.buffer.spill.threshold为10时,如果一个窗口有100条数据,则spark会写9((100 - 10)/10)次磁盘,在读的时候,会创建9个磁盘reader,每个reader会有一个1M的空间做缓存,也就是说会额外增加9M的空间。

当某个窗口中数据特别多时,会导致写磁盘特别频繁,就会占用很大的内存空间作缓存。因此如果观察到executor的日志中存在大量如下内容,则可以考虑适当调大该参数,平台默认该参数为40960。

代码块

Plain Text

pilling data because number of spilledRecords crossed the threshold 2.2 executor并发度

spark.executor.cores:单个executor上可以同时运行的task数。Spark中的task调度在线程上,该参数决定了一个executor上可以并行执行几个task。这几个task共享同一个executor的内存(spark.executor.memory+spark.yarn.executor.memoryOverhead)。适当提高该参数的值,可以有效增加程序的并发度,是作业执行的更快,但使executor端的日志变得不易阅读,同时增加executor内存压力,容易出现OOM。在作业executor端出现OOM时,如果不能增大spark.executor.memory,可以适当降低该值。平台默认设置为1。

该参数是executor的并发度,和spark.dynamicAllocation.maxExecutors配合,可以提高整个作业的并发度。

2.3 executor读取hive表时单task处理数据量/无shuffle作业小文件合并

spark.hadoopRDD.targetBytesInPartition:该参数是美团点评特有参数,目前还未反馈给社区。Spark在读取hive表时,默认会为每个文件创建一个task,如果一个SQL没有shuffle类型的算子,每个task执行完都会产生一个文件写回HDFS,这样就潜在存在小文件问题。该参数可以将多个文件放到一个task中处理,默认为33554432,即如果一个文件和另一个文件大小之和小于32M,就会被放到一个task钟处理。适当提高该值,可以降低调度压力,避免无shuffle作业产生过多小文件。

2.4 GC优化(使用较少,当尝试其他调优方法均无效时可尝试此方法)

executor的JVM参数传递方式为:set?spark.executor.extraJavaOptions="XXXXXXXXXX "。例如,set?spark.executor.extraJavaOptions="-XX:NewRatio=3 -XX:+UseG1GC"。

注:所有的JVM参数必须写在一起,不能分开。bad case:set?spark.executor.extraJavaOptions="-XX:NewRatio=3?";?set?spark.executor.extraJavaOptions="-XX:+UseG1GC?" ;

打开GC打印:-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps

full GC 频繁:内存不够用,调大spark.executor.memory,调小spark.executor.cores。

minor GC频繁,而full GC比较少:可以适当提高Eden区大小-Xmn

如果OldGen区快要满了,适当提高spark.executor.memory(平台默认2G)或适当降低spark.memory.fraction(平台默认为0.3)或适当提高-XX:NewRatio(老年代是年轻代的多少倍,一般默认是2)。

如果spark.executor.memory调的很大且GC仍是程序运行的瓶颈,可以尝试启用G1垃圾回收器(-XX:+UseG1GC)

修改了GC的参数一定要仔细观察GC的频率和时间。

修改方法:set?spark.executor.extraJavaOptions="-XX:NewRatio=3 -XX:+UseG1GC ..."

3 driver指标: 3.1 内存

spark.driver.memory:driver使用内存大小, 平台默认为10G,根据作业的大小可以适当增大或减小此值。

3.2 GC优化

通过set?spark.driver.extraJavaOptions="XXXXXXXXXX "设置,具体设置内容可参考2.4节,一般情况driver内存较大,可尝试启用G1垃圾回收器。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #spark #SQL #调优 #目录1 #运行行为11 #动态生成分区12 #broadcast