Chinaunix首页 | 论坛 | 博客
  • 博客访问: 499509
  • 博文数量: 80
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 1916
  • 用 户 组: 普通用户
  • 注册时间: 2013-07-11 22:01
个人简介

从事实时计算多年,熟悉jstorm/spark/flink/kafka/rocketMq, 热衷于开源,希望在这里和前辈们一起学习与分享,得到长足的进步!邮箱:hustfxj@gmail.com 我的githup地址是:https://github.com/hustfxj。欢迎和大家一起交流探讨问题。

文章分类

全部博文(80)

文章存档

2017年(11)

2015年(3)

2014年(33)

2013年(33)

分类: 大数据

2017-05-27 19:57:20

Receiver 并发

对于每种输入数据的DStream,都有一个Receiver对象与之相关联,对于每个Receiver对象,又有一个Source与之相对应。每一个Receiver对象代表一个数据接收端实例(即只有一个executor使用一个core来接收数据,并发度为1),如果要提高并发度,可以通过创 建多个Receiver对象来实现,方法如下:

val numStreams = 5
val kafkaStreams =(1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream =streamingContext.union(kafkaStreams)
unifiedStream.print()

其中,streamingContext.union()把多个Receiver的数据合并。给应用分配的cores数量必须大于Receiver数目,这样才能保证每个Receiver占用一个core的同时,至少还有另一个cores来处理数据。无论master是local的还是集群的,都应该这样配置,否则数据将得不到线程资源来处理。

Kafka DrectDstream 并发

利用KfakaDreatDstream定义的receiver的并发不是这样配置的,具体配置待补充……

##调整source端的并发 主要根据batch Interval和block Interval时间来调整,一般source端task并发数量是batchInterval/blockInterVal。具体可以参考 createBlockRDD()函数实现,每次blockInterVal时间端生存RDD一个分区的数据,所以batchInterVal时间,source端的RDD分区数量=batchInterval/blockInterVal,即source端task数量。

调整数据处理的并发度

reduceByKey 类型的操作,结果RDD的分区数可以通过参数传入。否则其结果RDD分区数由spark.default.parallelism来决定。这个property的默认值是父RDD的分区数。

调整数据序列化方式

调整batch interval

Spark实现流式计算的方式,其实相当于把数据分割成很小的时间段,在每个小时间段内做Spark批量计算。所以,这个时间段的大小直接决定了流式系统的性能。如果设置的太大,时效性不好,如果设置的太小,很可能计算数率赶不上数据流入的速度。一 般决定合适的batch interval的方式是:先用较大的batch interval和较低的数据量运行流式应用,从web UI上观察数据平均end-to-end时延,如果平稳且较低,则可以逐步减小batch interval或增大数据量,直至end-to-end时延平稳、小于batch interval并在一个等级上。这个时候一般是合适的batch interval。

设置流式数据checkpoint的周期

对于一个需要做checkpoint的DStream结构,可以通过调用DStream.checkpoint(checkpointInterval) 来设置ckeckpoint的周期,经验上一般将这个checkpoint周期设置成batch周期的5至10倍。

executors 并发数量

提交应用的时候通过配置相关参数,例如--executor-cores 2 --total-executor-cores 6,那么这次提交的应用需要的executor和driver数量=3;还可以结合spark.cores.max一起来确定executors数量。但不是绝对的,如果机器本身可利用的cores不多的话,那么spark确定的executors数量是打折扣的。spark.cores.max 这个参数决定了在Standalone和Mesos模式下,一个Spark应用程序所能申请的CPU Core的数量。这个参数对Yarn模式不起作用,YARN模式下,资源由Yarn统一调度管理,一个应用启动时所申请的CPU资源的数量由另外两个直接配置Executor的数量和每个Executor中core数量的参数决定。在yarn模式下可以直接通过参数--num-executors指定该应用所需的executor数量;

调优经验总结

  • 单条记录消耗大,使用mapPartition替换map,mapPartition是对每个Partition进行计算,而map是对partition中的每条记录进行计算;
  • 任务执行速度倾斜,如果是数据倾斜,一般是partition key取的不好,可以考虑其它的并行处理方式 ,并在中间加上aggregation操作;如果是Worker倾斜,例如在某些worker上的executor执行缓慢,可以通过设置spark.speculation=true 把那些持续慢的节点去掉;
  • reduce task数目不合适,需根据实际情况调节默认配置,调整方式是修改参数spark.default.parallelism。通常,reduce数目设置为core数目的2到3倍。数量太大,造成很多小任务,增加启动任务的开销;数目太少,任务运行缓慢。

其他

还可以参考:

阅读(2099) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~