1. 前言
本文通过对MapReduce的分析,列出MapReduce存在的问题,然后提出一种解决这些问题的改进型MapReduce,这种改进型的MapReduce暂且取名为MapBalanceReduce。由于经验和水平有限,所述观点和方法未必正确,诚心欢迎交流探讨。
2. 调度实体
在MapReduce和改进型MapReduce都存在Job和Task,它们之间的关系如下UML图所示:
Job是由一到多个Tasks组成的Tasks池,同一个Job内的各Task间是平等独立,不存在依赖也不存在优先级高低。Job Tree是在MapReduce之上的一层调度体,如存在于Hadoop Hive。
计算框架的作用就是通过将Job分解成Tasks,然后调配Tasks到集群中各节点去执行。因此Tasks是整个系统均衡和调度的核心对象。可以说,控制好了Tasks,就能够调度好均衡好,否则就可能发生数据倾斜,一些节点累死而另一些节点饿死。
3. MapReduce问题
MapReduce最重要的基础是DFS(分布式文件系统),它的工作原理可简单的使用下图表示,包含了map和reduce两个最核心的过程,以及A、B和C三个数据输入输出:
通过分析A、B和C不难得出如下表所示的特征:
| A | B | C |
存储位置 | DFS | 本地存储 | DFS |
块大小是否均衡? | 是 | 否 | 是 |
块大小是否可确定? | 是 | 否 | 是 |
map和reduce的块大小是否接近? | 不确定,非受控 |
map个数是否已知,非动态确定? | 是 |
reduce个数是否已知,非动态确定? | 是 |
除了上表所述的特征外,B部分的数据块个数通常为预先指定的reduce个数,因此其值通常不大,而且将reduce个数增倍意义也不大。
3.1. map
由于map的输入源自于DFS,是相对静态的数据,所以各MapTask是均衡的,而且其大小是已知和确定的。
3.2. reduce
reduce不同于map,它处理的是map后的数据,是动态产生的数据,只有在map完成之后才能确定的数据(包括数据的分布和大小等)。数据在经过map之后,就映射到了某个ReducdeTask,不能再更改,而且ReduceTask的个数也是不能修改的。
这会带来如下严重的问题:
1) reduce端数据倾斜:比如有些ReduceTask需要处理100GB数据,而另有一些只需要处理10GB数据,甚至还有些ReduceTask可能空转,没有任何数据需要处理。最严重时,可能发生某个ReduceTask被分配了超出本地可用存储空间的数据量,或是超大数据量,需要特长处理时间;
2) reduce端数据倾斜直接导致了ReduceTask不均衡;
3) 并行Job困难。类似于操作系统进程调度,如果要并行,必然存在Job间的调度切换,但由于ReduceTask需要处理的数据量可能很大,需要运行很长的时间,如果强制停止ReduceTask,对于大的ReduceTask会浪费大量的已运行时间,甚至可能导致一个大的Job运行失败。因此,无法实现类似于进程的并行调度器。
3.3. 数据不均衡的两种情况
数据不均衡可分为两类:
1) KEY过于聚集,即不同KEY的个数虽多,但经过映射(如HASH)后,过于聚集在一起
采用两种办法相结合:一是将KEY分散得足够大(如HASH桶数够多),二是在balance的时候,进行重HASH,将大的打成小的。
2) KEY值单一,即不同KEY的个数少
对于这种情况,采用HASH再分散的方法无效,事先也无法分散得足够大,但处理的方法也非常简单。对这种情况,按照大小进行横切即可,但这个时候一次reduce无法得到最终结果,至少需要连接两次reduce,另外还需要增加balance接口,以方便区别是最后一次reduce,还是中间的reduce。
3.4. 总结:两大主要问题
总的来说,MapReduce存在如下两大问题:
1) reduce并非不均衡,可能导致严重的倾斜;
2) 并行调度能力弱,这是因为每个Task(主要是ReduceTask)的时间粒度不可控制。
4. 改进型MapReduce
4.1. 方案介绍
改进型MapReduce在map和reduce中间,增加一个balance过程,这个balance是可选的,只在必要时发生,如下图所示:
如果map输出的数据已经符合reduce均衡的要求,那么balance过程就什么也不做,否则它会在数据由map输出传递给reduce之前,做一次重新分配,重分配的目的是保证每个reduce的输入基本相同(允许有一定的差额),而且大小在指定的值上下浮动。
这个过程中,并不存在数据先由map传递给balance,再由balance传递给reduce,因此不会带来较大的额外开销。对于改进型MapReduce,其表现为:
| X | Y | Z | W |
存储位置 | DFS | 本地存储 | 本地存储 | DFS |
块大小是否均衡? | 是 | 否 | 是 | 是 |
块大小是否可确定? | 是 | 否 | 是 | 是 |
map和reduce的块大小是否接近? | 是,有保证的 |
map个数是否已知,非动态确定? | 是 |
reduce个数是否已知,非动态确定? | 否,动态确定 |
map输出时,将数据按指定的规则(如Hash),分成足够多的块(在MapReduce方案中为reduce个数),目的是方便在balance时,可以保证新的新块是均衡和大小在指定的范围内,所以map输出的块个数相对于MapReduce方案要多很多,通常为10倍以上,因为相对较小的块组合成指定大小的块简单高效些。
balance发生在数据由map本地传输到reduce的过程中,它相当于一个路由器,在map的基础上,对数据进行再映射,这个过程的开销很小,因为不需要对数据进行计算,而只是确定块对应到哪个reduce。
map输出的块,仍可能过大,这个时候需要balance对这部分数据进行拆分,可以将这些需要较长处理时间的工作,定义成一个新的Task,比如BalanceTask。
4.2. 并行调度
在所有Task均衡,且其大小是可控的前提下,并行调度就可以仿照进程调度去做。我们可以将Task当作一个运行时间片,由于其大小可以控制,所以只要大小适当,基本上就可以控制其运行时长。
当一个Task运行完后,根据调度规则来决定下一个运行的Task,下一个Task并不一定是同一个Job,和操作系统进程调度对比如下:
4.3. 新的不足
改进型的MapReducke,由于在reduce之前需要一个balance过程,所以reduce和map两个过程在时间上没有重叠,因此对于单个Job,它的执行效率可能比MapReduce低。但一个Job的map可以和另一个Job的reduce在时间上重叠,因此并行多Job调度时,就不存在这样的不足了,而实际情况通常都是多Job并行调度,所以这个不足可以忽略。
4.4. 总结
改进型MapReduce,实际上在MapReduce上做两件事:保证ReduceTask均衡和控制ReduceTask大小。
讨论: