Chinaunix首页 | 论坛 | 博客
  • 博客访问: 482886
  • 博文数量: 80
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 1916
  • 用 户 组: 普通用户
  • 注册时间: 2013-07-11 22:01
个人简介

从事实时计算多年,熟悉jstorm/spark/flink/kafka/rocketMq, 热衷于开源,希望在这里和前辈们一起学习与分享,得到长足的进步!邮箱:hustfxj@gmail.com 我的githup地址是:https://github.com/hustfxj。欢迎和大家一起交流探讨问题。

文章分类

全部博文(80)

文章存档

2017年(11)

2015年(3)

2014年(33)

2013年(33)

分类: 大数据

2017-05-27 19:38:28

1 Jstorm简介

      Jstorm是开源的分布式实时计算系统,相比于storm的优势在于稳定性、容错性和性能等。JStorm通过减少对zookeeper的访问量、增加反序列化线程、优化ACK、增加监控内容及JAVA本身优势等各个方面优化了Storm的性能和稳定性。总之,JStorm比Storm更强大、更稳定、性能更好。这里我们主要介绍jstorm的调度原理及实现,现在的jstorm相较于storm的任务调度更加合理,更加适合大集群,支持用户定制任务调度及动态扩容等高级任务调度功能。在了解调度之前,我们简单介绍下其架构和拓扑模型。如图所示表示的jstorm的系统结构。

       整个jstorm系统共存三类不同的进程,分别是nimbus,supervisor和worker。Nimbus主要负责topology提交、向zk写入元信息及任务调度;supervisor根据nimbus任务调度结果启动/停止工作进程worker;worker是jstorm任务执行者,里面的task对应拓扑模型的一个component。下图表示一个拓扑。Spout是jstorm的消息源,用于生成消息,bilt是消息的处理单元,acker是用于保证消息不丢失的component。下图存在的component有1个spout、2个bolt-A、2个bolt-B和1个acker,这些component组成了一个拓扑,消息经过拓扑树传递处理,可以得到用户想用个的结果。

结合拓扑模型以及jstorm系统结构,jstorm任务调度就是在集群中利用合适的资源,在supervisor机子上创建合理数量的worker,并将拓扑的component合理分配到各个worker上去的过程。Jstorm任务调度有以下几种特点:

  • 尽可能保证每台supervisor机子上worker数量的平均分配,尽可能保证每个worker上task数量的平均分配;
  • 为了提高集群的稳定性,尽可能的将同类型的组件分配到不同的supervisor及worker上去,如果将同类型的组件分配到了一台supervisor上去,如果这一台supervisor挂了,可能就会造成拓扑中这一类组件的丢失;
  • 为了提高性能,尽量使得存在直接消息传递关系的component-task分配到同一个worker,这样的话用一个worker下的消息传输走的就是内部通道,传递速度会比较快;
  • 支持很多高级功能,下面我们会介绍到;

2  Jstorm任务调度

       拓扑一般以参数文件的形成提交,参数文件可以设置worker数量、component并发度和worker的内存和cpu等,如下所示。在nimbus去调度任务之前,需要确定将要分配的worker数量。


Nimbus会根据每台机子的可利用cpu和内存信息,和用户设置的参数worker.memory.size、supervisor.slots.port.cpu.weight。根据getSupervisorPortList计算出每个supervisor的可利用端口数量,以及整个集群的可利用端口数量。所以最后实际的需要调度的worker数量时取集群可利用端口数量、topology.workers和拓扑的并发度总数这三者中最小值。根据每个supervisor机子可利用端口数量,来确定每个supervisor将要启用的worker数量,尽量保证supervisor上启用的worker数量平均。如图所示,黑色代表supervisor机子上已经存在的worker,在这集群上提交拓扑,需要再分配5个worker。则分配算法为红色表示,supervisor-1先分配worker-1,worker分配到supervisor次序如标号所示。

确定了各个supervisor机子上要启动的worker数量后,再确定每一个worker将要分配的task。分配task到特定的worker的原则有:

  1.  尽量将同组件的task分配到不同的worker及supervisor上去,保证该worker上同组件的task数量最少,保证通组件task到不同的supervisor或worker上去,提高了拓扑运行的稳定性;其比较函数实现如下图1,遍历所有worker利用比较函数选择出合适的worker;
  2. 尽量分配task到supervisor上worker已有task数量比较少的上去,保证每个worker上task数量均衡;其比较函数实现如下图,遍历所有worker利用比较函数选择出合适的worker;
  3. 尽量将有直接消息传输的task放到同一个worker上去,保证消息传输走内部通道,保证传递速度;遍历所有worker利用比较函数选择出合适的worker;

     task选择合适的worker,先按照规则i筛选,如果满足规则i的worker数量不止唯一;再根据规则ii筛选,结果仍然不唯一;再按照规则iii筛选。里我们以topology_nums=1,  topology_level=1, workers=24, spout.parallel=10, bolt_0.parallel=18, acker.executors=12,集群数量为6台测试为例。

  • 测试目标

以worker为维度,尽量将worker平均分配到各个supervisor上;

②保证同类型task尽量均分到不同worker和supervisor的基础上,其次保证每一个worker和supervisor的task数量均衡,最后保证直接相关的task放到一个进程上去;

  • 测试结果:

Worker被一一分配到supervisor上

Task按规定分配到work上去(共有24端口,先分配12个acker保证能够放到不同的进程且每个supervisor有2个acker;再分配18个bolt-task,其中6个task会与acker在一个进程;最后分配10个spout-task分配,应该和bolt-task在一起)

3  用户自定义任务调度

       Jstorm实现了用户自定义调度功能,提供给用户的接口是ConfigExtension.setUserDefineAssignment,用户只要自定义List数据即可。WorkerAssignment允许用户去设置这个worker的jvm参数、worker的内存大小和cpu的权重。这里我们以例子说明用户如何实现自定义调度;以下贴出了读取配置文件设置自定义调度的过程:

文件


Nimbus在任务调度时,其实会把自定义调度优先级设定为最高。

4  其他

  • 可以设置on.differ.node参数,强制要求同组件的task分布到不同supervisor;
  • 可以设置isolation.scheduler.machines参数,可以使某个拓扑只在设定的supervisor机子上进行任务调度;
  • Jstorm在9.7版本最后实现了任务的动态伸缩,参考

阅读(9003) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~