Chinaunix首页 | 论坛 | 博客
  • 博客访问: 499502
  • 博文数量: 80
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 1916
  • 用 户 组: 普通用户
  • 注册时间: 2013-07-11 22:01
个人简介

从事实时计算多年,熟悉jstorm/spark/flink/kafka/rocketMq, 热衷于开源,希望在这里和前辈们一起学习与分享,得到长足的进步!邮箱:hustfxj@gmail.com 我的githup地址是:https://github.com/hustfxj。欢迎和大家一起交流探讨问题。

文章分类

全部博文(80)

文章存档

2017年(11)

2015年(3)

2014年(33)

2013年(33)

分类: 大数据

2017-05-27 19:42:34

2.x Spark 调优(官方文档译文)

Spark采用基于内存的计算方式,尽管这种方式对数据处理的效率很高,但也会往往引发各种各样的问题,Spark中常见的OOM等等。效率高的特点,注定了Spark对性能的严苛要求,那Spark不同程序的性能会碰到不同的资源瓶颈,比如:CPU,带宽、内存。如果该程序性能遇到了阻碍,但不是内存问题(编码问题),通常来说便需要你处理带宽的瓶颈问题。但有时候,也需要你做一些程序上的优化,比如以序列化方式存储RDD,从而减小内存使用量。其中RDD的存储方式如下所示:

Storage Level Meaning
MEMORY_ONLY Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISK Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SER (Java and Scala) Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER (Java and Scala) Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLY Store the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental) Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.

数据序列化

数据序列化是影响分布式程序的性能的关键因素之一,将对象通过序列化手段转化为字节流的速度慢,或者完成该对象的序列化需要很长的个字节数组,这都会降低程序的处理效率,通常来说上述情况是你进行Spark应用程序优化的第一步。Spark旨在找到一个操作方便(允许在操作中使用任意的Java类型)且性能相对较好的折中方案。它提供了2个序列化库:

1)Java Serialization:默认启用,Spark序列化对象使用Java的ObjectOutputStream框架,这种框架支持将任何实现了java.io.Serializable接口的类进行序列化操作。你也可将待序列化的类继承java.io.Externalizable接口,从而达到进一步控制序列化性能的目的。该接口定义了两个方法来读取和序列化对象,它们分别是WriteExternal以及readExternal。Java序列化框架虽然具有很高的灵活性,但是序列化速度慢,并且对于某些类的序列化常常需要很大的序列化空间才能表述。

2)Kryo Serialization:Spark也可以使用Kryo libary(version 2)快速地对对象进行序列化,Kryo相比于java具有更快的序列化速度(通常是java的10倍)以及更强的压缩能力,缺点是不能支持序列化所有实现了Serialization接口的类,并且为了更高的性能体验,你需要在使用之前首先将类注册到Kryo之中。一个简单的对比例子如下所示:

你可以在Spark中使用Kryo方式通过在SparkConf设置spark.serializer conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 这种设置方式该序列化器,不仅在Shuffle阶段,在RDD保存到磁盘时也会被使用,使得Kryo不能成为默认配置的唯一原因就是编程人员需要对序列化类进行注册。当开发网络需求密集的程序时,本文推荐尝试使用Kryo序列化方式,因为序列化后字节数组越小意味着对带宽的要求越小。

这两种优化方式存在,能用用Kryo序列化的对象,一定可以用Java Serializer序列的关系,故而在Spark中,已经默认实现Scala中一些常用一些常用类的Kryo序列化中的注册工作,这些常用Scala类的注册工作均在ScalaKryoInstantiator文件中的AllScalaRegistar中实现。在Scala中实现了Unit方法的类或者半生类,均能够直接使用Kryo。

如下代码,提供了如何将自定义的类,注册到Kryo中:

val conf = new SparkConf().setMaster(...).setAppName(...) conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) val sc = new SparkContext(conf) 

如果要序列化的对象很大,那么便需要增加spark.kryoserializer.buffer的大小,这个属性的默认值是32,你可以根据需求来增加其值,以便能够完全存储该对象序列化后的二进制数组。最后,如果你并没有对类进行注册,Kyro同样能够正常工作,只不过是存储了该类的全名称(包含包名以及类名).

内存优化

本文将从3个点来考虑内存的优化:1)程序中对象所占用的内存(如果有需求将所有的数据都加载到内容) 2)访问这些对象所需内存 3)垃圾回收所需内存(如果生命周期短的对象,出现次数很多。即很少有对象步入老年代,对象回收频繁)

通常来讲,Java对象的访问速度较快,但是占用的空间大小确是实际存储数据大小的2-5倍,产生这种现象的原因,一般有如下几种可能:

1)每一个Java对象都有一个对象头,对象头大约占用16个字节,并且保存了一些信息,比如指向类的指针。这就导致,如果一个对象内部数据空间很小比如仅仅存储了一个int(4个字节),那么这个对象所占用的空间就远远大于存储数据所占用的空间。

2)Java String对象通常比实际数据所占用的空间多出40个字节左右(由于String内保存一个char数组,故而需要额外的空间来存储数组的大小等信息)。此外,由于String的内部采用UTF-16编码,故而每一个字符都被保存为2个字节,因此一个10字符的String很容易就会消耗60个字节的空间(60 = 40 + 10 * 2)。

3)常见的集合类,比如HashMap以及LinkedList,使用链表结构,并且通常会对每条记录进行进行包装(比如Map.Entry 纯粹是为了操作方便,比如entry.key entry.value)。这些类的对象中,不仅包含了对象头,也包含了一个指针(通常为8个字节,16位2个字节,32位4个字节,64位8个字节)用来指向下个位置。

4)集合中的基本类型(primitive types)通常伴随着装箱拆箱操作,故而实际存储的应该是类而非基本类型。比如int对应Integer

下面,本文首先会对Spark中的内存管理做一个概述,然后探讨通过何种策略可以在应用中更高效地利用内存。重点讲解怎样估算应用程序中对象所占空间的大小以及如何通过更改数据结构抑或以序列化格式对数据进行排序从而达到优化的目的。之后讲解优化Spark cache大小以及java垃圾回收器。

内存管理概览

Spark中内存的使用,主要分为两类,即:执行和存储。执行内存( Execution memory )是指在Shuffle、Join、Sort、aggregation操作中用来计算的内存。存储内存( storage memory)是指cache以及在集群中传播的数据(broadcast)。在Spark中,执行以及存储共享同一块空间(M)。当没有计算内存需求时,存储内存可以申请到M内的所有内存,反之亦然(vice versa)。执行操作在必要的情况下,可以申请回收存储内存,但是,当M内总的存储只剩R则不可回收,换句话说,R描述了M内的一个子分区,此子分区内缓存的数据块不会被执行内存回收。在Spark中存储内存并不会回收执行内存,原因是该功能实现起来太复杂。这种设计保证了几点理想的特性:

  • 首先,如果应用程序内没有缓存操作,那么整块空间都可以用来作为执行内存,从而避免不必要的磁盘溢出(spill to disk)。

  • 其次,使用了缓存的应用程序,可以保留最小的存储空间(R),在该存储空间内的数据块不会被执行内存回收。

  • 最后,这种方式提供了类似黑盒的操作,由系统在内部执行复杂的工作负载,从而不需要使用者完全了解在内部内存是如何进行划分的。

尽管Spark中提供了如下2个相关的配置,但是对于一般的用户来说,没有必要对该值进行调整,因为这两个默认值对于绝大多数程序来说都是可用的:

1)spark.memory.fraction 以分数的形式表达M区的大小,该分数是相对JVM heap space (默认300MB)来说的。默认为0.6,剩余的40%留作保存用户的数据结构,Spark内部元信息以及用来阻止由于数据记录稀疏且异常大而产生OOM异常的安全守卫(safeguarding )空间。

2)spark.memory.storageFraction 以分数的形式表达R区的大小,该分数是相对M来说的,默认为0.5。

确定内存消耗

1) 最好的确定一个数据集消耗内存的方式是创建一个该数据集的RDD,然后对此RDD进行cache,之后查看页面的“Storage”选项卡,在该页面上直接查看该cache RDD所需要的内存大小;

2) 估计一个具体的对象所需要的内存可以通过SizeEstimator’s estimate 方法,此方法的另外一个好处就是可以通过测试对比不同数据布局所需空间大小,从而优化内存,不仅如此,通过该方法还能够确定某个广播变量在每个executor(worker)节点上占用堆的大小

 val vector = new PrimitiveVector[Long](8192) for (i <- 0 until 8192) { vector += i
    }
    val actualSize = SizeEstimator.estimate(vector) 

数据结构优化

减少内存开销的第一步是避免使用一些会增加额外内存开销的JAVA特性,比如基于指针的数据结构以及对象的包装操作等等。如下列举了几点如何进行数据结构优化:

1)在设计数据结构时,尽量使用Array数组以及原始数据类型,而不是使用JAVA或Scala中的一些集合类(比如HashMap),虽然JCF(JAVA Collection FrameWork)设计做的很好,但是从性能上看有一定的局限性,因此出现了很多扩展JDK集合的框架出现,比如:fastutil、Apache Common Collections、Apache common Primitives、Google Guava、Huqe Collections 等等,本文建议使用fastutil。

2)尽量避免大量的小对象和指针的嵌套结构。

3)考虑使用数值ID或者枚举对象来替代字符串形式的key

4)如果单点主机内存小于32GB,设置JVM参数-XX:+UseCompressedOops来设置指针所占字节为4而并非8。该参数的设置可以在Spark-env.sh中进行,具体如下:

SPARK_DAEMON_JAVA_OPTS="-Xmx16g -Xms16g -Xmn256m -XX:+UseParNewGC

-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70

-XX:ParallelGCThreads=10 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedOops" 

RDD存储序列化

如果你的对象对于内存空间来说依旧过大,那么本节将起到至关重要的作用,一个最简单的减少内存使用的方法就是以序列化格式存储对象。使用RDD缓存API中序列化的存储级别(比如MEMORY_ONLY_SER),这样之后Spark将会以大字节数组的形式存储每个RDD的partition。采用序列格式存储对象唯一的不足就是访问对象的时间会增加,因为每次访问均需要对字节数组进行反序列化。本文强烈建议使用Kryo如果你想要以序列化格式缓存数据,因为Kryo序列化后的字节数组要远远小于java自带的序列化.

垃圾回收优化

JVM垃圾回收中,比较忌惮的是遇到大对象,所谓的大对象是指,需要大量连续内存空间的JAVA对象,最典型的大对象就是那种很长的字符串以及数组。但是最为忌惮的是一群"朝生夕灭"的“短命大对象”。所以如果在程序中,有许多短命的RDD时(通常来说,一次性生成RDD之后调用多次方法进行处理是没有问题的)JVM垃圾回收将会有很大的负载。当Java中需要为新对象分配空间,从而回收旧对象空间时,垃圾回收器将会跟踪查询程序中所有的对象,并找到无用的旧对象,进而对空间进行释放(Full GC),在这里你需要注意的一点就是,垃圾回收的成本和Java对象的数量成正比,因此选择涉及的对象越少的数据结构越好(比如,使用int数组,而不是使用LinkedList)这样大大减少垃圾回收的消耗。一个更好的方案就是以序列化格式存储对象,这样之后每一个RDD的partition中仅仅存在一个对象(字节数组)。如果GC是工程的瓶颈,那么在尝试使用其他方法之前,首先尝试将RDD进行序列化。

GC在任务(task)执行过程中由于工作内存以及RDD缓存的相互干扰,也可能会引发一些问题。在下面本文将会探讨如何通过控制RDD的内存分配来减轻这种干扰。

计算GC影响

GC优化的第一步便是收集统计数据,这些统计数据包含GC发生的频率以及GC需要的时间。要完成这种统计,可以通过添加如下配置到Java选项:

SPARK_DAEMON_JAVA_OPTS="-Xmx16g -Xms16g -Xmn256m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" 

配置完成之后,当再次运行job时,读者可以通过woker节点的日志查看每次GC的情况,在程序中可以这样设置:

conf.set("spark.executor.extraJavaOptions","-XX:+PrintGCDetails -XX:+PrintGCTimeStamps") 

深入GC优化

在Spark中GC优化的目标是,只有生命周期足够大的RDD才能够被放到老年代之中,并且保证新生代的空间足够用来存储程序运行期间的短生命周期对象。这将有利于预防对程序运行期创建的临时对象使用Full GC进行空间回收,如下几点将会对此起到帮助:

1)通过收集GC状态信息监控是否发生了过多的GC操作,如果在task未完成之前发生了多次Full GC那么读者就需要注意了,这表明没有足够的内存来执行相关task。

2)如果有许多Minor GC但是Major GC并不是很多,这时候给Eden区分配更多的空间是有帮助的。读者可以将Eden的空间设置为每个task执行时需要消耗的内存,或者比此内存大。假设Eden大小被估计为E,那么新生代的大小可以估计为 -Xmn = 4/3 * E(-Xmn设置新生代所占空间大小,顺便提一下JVM中Eden区与Survivor区比例为8:1,因此按照JVM中算法 -Xmn = 5/4 * E,spark中将Eden区缩小了。比例可以通过JVM SurvivorRatio进行设置)

Minor GC: 发生在新生代的垃圾收集动作,Java大对象大多都是朝生夕灭,故而Minor GC十分频繁

Major GC: Full GC,发生在老年代的GC,其速度一般会比Minor GC慢十倍左右,并且是Stop-the-World

3)如果监控到老年代空间将满,那么便需要减小用作cache的空间大小,通过设置spark.memory.fraction即可。spark.memory.fraction(默认值为0.75):用于设置存储内存和执行内存占用堆内存的比例。若值越低,则发生spill和evict的频率就越高。相比于降低任务的执行效率,缓存少量的数据是可取的。另外,可以考虑减少新生代的空间大小,也就是通过-Xmn即可实现。如果没有设置-Xmn属性的话,也可以通过设置JVM的NewRatio参数来实现,JVM虚拟机默认该值为2,这表示老年代占据堆区的2/3,该参数的取值应该足够大,至少要超过spark.memory.fraction的值。这样就表示老年代的空间足够用来保存存储内存中的对象。

4)尝试用用G1垃圾回收器。启用方式为:

-XX:+UseG1GC 

此垃圾回收器能够在某些回收成为瓶颈的项目中显著提升性能,有一点需要注意,如果executor节点的堆区很大,那么增加G1的独立区域(Region)是有必要的,这个值可以通过 -XX:G1HeapRegionSize 进行设置。

其他建议

并行度

Spark集群一般都不会被充分利用,除非对每个操作都设置了高精度的并行化参数。Spark中默认以文件的大小作为Map任务数量的依据(读者可以通过设置来控制此值,如sparkContext.textFile(file,parallelism))以及以父RDD中最大的partition数量作为reduce任务数量(如GroupByKey,reduceBykey),读者可以通过将并行程度作为第二个参数传入,或者通过配置spark.default.parallelism的值来改变Spark内的默认值。本文建议该值为机器核数 * (2 or 3)

减少任务使用内存

Spark常见的OOM异常,一般情况下并不是因为RDD不能够完全放入到内存之中,而是该工程中某个task不能够申请到足够的内存。比如在groupByKey操作中的reduce任务,常常会很大。Spark的shuffle操作(sortBykey, groupByKey,join等等)均在每个task中创建了一个Hash表,用来做分组,这个Hash表常常会异常的大。如果遇到这种情况,最简单的方式就是增加并行程度,这样每一个task的输入集合相对就会变小,从而Hash表也会变小。在Spark中最快200ms就能够运行完一个task,这是因为Spark实现了对JVM的重用(是指通过线程池复用线程来便面系统启动和切换开销),所以任务启动耗时较小,因此读者可以放心地对并行度进行增加。

广播大变量

使用Spark中的broadcast操作可以大幅减小task序列化后的空间,以及缩短集群中job的启动,如果读者在Driver端有很大的对象(比如静态数组用来查询),此次可以考虑将该值进行broadcast操作。Spark将master日志打印出序列化的任务所占字节的大小,因此读者可以通过查看日志,决定任务是否应该进行优化,一般来说如果任务的序列化后字节超过20KB,那么就意味着需要进行优化。

数据本地化

数据本地化对Spark的性能影响起到决定性的作用。如果计算和数据都在同一个节点上那么该任务的执行效率很高,但是如果计算和数据分离,那么总会有一方向另一方移动。通常来讲是计算向数据移动,因为序列后的代码相比如数据来说所占的内存不大。 如下介绍了由于数据与计算的距离的不同,处理效率的变化:

  • PROCESS_LOCAL 数据和计算在同一个JVM中,这是本地化中效果最好的

  • NODE_LOCAL 数据和计算在同一个节点之中,这种方式要比PROCESS_LOCAL 慢些,因为数据需要在进程间传递。

  • NO_PREF 数据访问速度完全相同

  • RACK_LOCAL 数据和计算在同一机架之中,但是在不同的节点,这样数据可以通过1次交换机在局域网中进行传输。性能最差的当属,计算和数据在不同的机架之中,并且需要通过网络传输。

PROCESS_LOCAL 是Spark中期望的现象,但是由于种种原因,并不会总是出现这种情况,此种情况下Spark会从上述级别中依次向下选取,直至成功。在选择时这里其实存在两个选项:

1)等待数据节点上的CPU忙完之后,然后再运行。

2)找一个空闲的CPU立即开启工作,然后搬运数据。

后续

现在spark相关的技术文档基本上都是1.x版本,而在1.6+版本之后, Databricks 公司提出的对Spark优化内存和CPU使用的计划,美其名曰Tungsten Project ,该计划对Spark SQL优化的最多。不过对部分RDD API 还有Shuffle也是有益。之后我们团队会定期总结2.x版本相关文档,请期待下一遍:详尽spark的内存管理。


阅读(1455) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~