从事实时计算多年,熟悉jstorm/spark/flink/kafka/rocketMq, 热衷于开源,希望在这里和前辈们一起学习与分享,得到长足的进步!邮箱:hustfxj@gmail.com 我的githup地址是:https://github.com/hustfxj。欢迎和大家一起交流探讨问题。
分类: 大数据
2017-05-27 19:57:20
对于每种输入数据的DStream,都有一个Receiver对象与之相关联,对于每个Receiver对象,又有一个Source与之相对应。每一个Receiver对象代表一个数据接收端实例(即只有一个executor使用一个core来接收数据,并发度为1),如果要提高并发度,可以通过创 建多个Receiver对象来实现,方法如下:
val numStreams = 5 val kafkaStreams =(1 to numStreams).map { i => KafkaUtils.createStream(...) } val unifiedStream =streamingContext.union(kafkaStreams) unifiedStream.print()
其中,streamingContext.union()把多个Receiver的数据合并。给应用分配的cores数量必须大于Receiver数目,这样才能保证每个Receiver占用一个core的同时,至少还有另一个cores来处理数据。无论master是local的还是集群的,都应该这样配置,否则数据将得不到线程资源来处理。
利用KfakaDreatDstream定义的receiver的并发不是这样配置的,具体配置待补充……
##调整source端的并发 主要根据batch Interval和block Interval时间来调整,一般source端task并发数量是batchInterval/blockInterVal。具体可以参考 createBlockRDD()函数实现,每次blockInterVal时间端生存RDD一个分区的数据,所以batchInterVal时间,source端的RDD分区数量=batchInterval/blockInterVal,即source端task数量。
reduceByKey 类型的操作,结果RDD的分区数可以通过参数传入。否则其结果RDD分区数由spark.default.parallelism来决定。这个property的默认值是父RDD的分区数。
Spark实现流式计算的方式,其实相当于把数据分割成很小的时间段,在每个小时间段内做Spark批量计算。所以,这个时间段的大小直接决定了流式系统的性能。如果设置的太大,时效性不好,如果设置的太小,很可能计算数率赶不上数据流入的速度。一 般决定合适的batch interval的方式是:先用较大的batch interval和较低的数据量运行流式应用,从web UI上观察数据平均end-to-end时延,如果平稳且较低,则可以逐步减小batch interval或增大数据量,直至end-to-end时延平稳、小于batch interval并在一个等级上。这个时候一般是合适的batch interval。
对于一个需要做checkpoint的DStream结构,可以通过调用DStream.checkpoint(checkpointInterval) 来设置ckeckpoint的周期,经验上一般将这个checkpoint周期设置成batch周期的5至10倍。
提交应用的时候通过配置相关参数,例如--executor-cores 2 --total-executor-cores 6,那么这次提交的应用需要的executor和driver数量=3;还可以结合spark.cores.max一起来确定executors数量。但不是绝对的,如果机器本身可利用的cores不多的话,那么spark确定的executors数量是打折扣的。spark.cores.max 这个参数决定了在Standalone和Mesos模式下,一个Spark应用程序所能申请的CPU Core的数量。这个参数对Yarn模式不起作用,YARN模式下,资源由Yarn统一调度管理,一个应用启动时所申请的CPU资源的数量由另外两个直接配置Executor的数量和每个Executor中core数量的参数决定。在yarn模式下可以直接通过参数--num-executors指定该应用所需的executor数量;
还可以参考: