Chinaunix首页 | 论坛 | 博客
  • 博客访问: 127713
  • 博文数量: 69
  • 博客积分: 2895
  • 博客等级: 少校
  • 技术积分: 710
  • 用 户 组: 普通用户
  • 注册时间: 2010-09-03 18:05
文章分类

全部博文(69)

文章存档

2010年(69)

我的朋友

分类:

2010-09-09 13:34:38

1. 修改mapper和reducer数量

2. 尽量的用Combiner

3. 选用最合适job的Writable

4. 在mapper和reducer中重用Writable


1.的的作业在运行过程中常常碰到一些这样的情 况:
  • 每一个map或者reduce只有30-40秒钟就结束
  • 超 大规模的时,通常会需要大量的map和reduce的slots 支持,但是job运行起来后,running的map和reduce并没有沾满集群的可用slots
  • 当几乎所有的map和 reducers都在调度中运行着,此时却有 一个或者两个pending的map或者reduce,一直不跑,使得job一直无法正常结束。

对一 个job的map数和reduce数的设定对一个job的运行是非常重要的,并且非常简单。以下是一些这几个值的经验总结:

  • 如果job的每个map或者 reduce task的运行时间都只有30-40秒钟,那么就减少该job的map或者reduce数,每一个task(map|reduce)的setup和加入到 调度器中进行调度,这个中间的过程可能都要花费几秒钟,所以如果每个task都非常快就跑完了,就会在task的开始和结束的时候浪费太多的时间。JVM 的reuse方式也可以这个问题。
  • 如 果某个input的非常的大,比如 1TB,可以考虑将hdfs上的每个block size设大,比如设成256MB或者512MB,这样map和reduce的可以减小。而且用户还可以通过:hadoop distcp -Ddfs.block.size=$[256*1024*1024] /path/to/inputdata /path/to/inputdata-with-largeblocks的方式来将已经存在咋hdfs上的数据进行大块化。然后删除掉原先的文件。
  • 只 要每个task都运行至少30-40秒钟,就可以考虑将mapper数扩大,比如集群的map slots为100个,那么就不要将一个job的mapper设成101,这样前100个map能够并行完成,而最后一个map要在前100个 mapper结束后才开始,因此在reduce开始运行前,map阶段的时间几乎就要翻倍。
  • 尽量不要运行太多的reduce task。对大多数job来说,最好rduce的个数最多和集群中的reduce持平,或者比集群的 reduce slots小。这个对于小集群而言,尤其重要。

测试对比:

调 整运行参数( -Dmapred.max.split.size=$[16*1024*1024] ), 或者在配置文件中对将 mapred.max.split.size设置成$[16*1024*1024] ,hadoop 中的wordcount任务的mapper数就会受到用户控制。当运行这种配置的任务时,每个task都会在10秒钟之内运行完,而从 jobtracker的webui上可以看到cluster的总体情况和job的情况,其中可以看到,running的map数频繁的在0-24之间波 动。整个job17分52秒完成,比使用原始配置的job的运行时间的两倍还多。

2.当运行中发现如下的现象,通常job是 可以优化的:

  • Job的运行过程中会有一系列的sort 的操作,并且reduce input groups的counter变量的值远远要小于reduce input records counter。
  • Job在 mapper完成以后,shuffle过程中传输了大量的中间结果(例如:每个slave上的map output bytes都好几个GB)
  • 在job 的webui上的counter中看到,job的spilled records的数量远远要大于map output records的数量。

如 果job的算法中涉及到许多的排序操作,可以尝试写一个来 提高job性能。的框架中提供了Combiner来减少中间结 果对磁盘的写入和减少中间结果在mapper和reducer之间的传输,通常这两个方面都是非常影响作业性能的两个方面。

性 能对比:

修改wordcount程序,将setCombinerClass去掉,或者不去掉,两种方式对比 运行。去掉的结果是,让每一个mapper的运行时间由原先的33s变成了平均48s,并且shuffle过程中的中间也由1G变成1.4GB,整个的job由原先的8分30 秒变成了15分42秒,将近两倍了。而且,这个测试还是在enable了map的output 压缩的情况下进行的,如果disable这个特性,性能的影响可能会更加大。

3.如果在mapreduce的作业中有类似这样的情况,那么就肯定是可以优化的:

  • Text 对象被用来处理非text的或者复杂结构的数据
  • IntWritable orLongWritable objects are used when most output values tend to be significantly smaller than the maximum value.
  • 当大多数的output中的value都比最大的value值要小的多的时候使用 IntWritable 或者 LongWritable对象。

当mapreduce的新手写mapreduce程序时,或是习惯了 hadoop的streaming作业的使用方式,转而使用mapreduce来写程序时,常常会有人不正确的使用Text类,很多的时候,没有这个必 要。虽然使用Text非常方便,但是,从数字类型转换成UTF8的字符串的效率是很低的,如果这样的操作很多,是会占用过多的cpu时间而导致作业效率降 低。任何时候,当处理非文本数据(数字,浮点等类型),使用IntWritable或者FloatWritable的效率会高很多。

除了 Text的问题,二进制的对象通常还会占 用更少的存储。由于磁盘IO和的传输常常都是 mapreduce作业的瓶颈所在,这样做可以很大程度的提高job的性能,尤其是当job的规模很大的时候。当处理Integer数据时,同样有时是可 以使用VIntWritable或者VLongWritable来提高性能。不仅可以减少cpu占用,也可以减少空间占用。例如,4,当序列化时会被序列 化成1个byte,而10000序列化时会占用2个bytes。这种类似couters的值,序列化时的length就会很重要。当数据都是很小的数字, 而只有一些是很大的时候,这样的效率节约是非常可观的。

如果hadoop中自带的Writable的各种类型不能满足需要,可以自己写一 个。非常的简单,这样也可能会比直接用Text,然后解析来的高效。在写自己的Writable时,尝试提供一个RawComparator(可以从 hadoop的源码中找到类似的实现)。

同样,如果maprduce job是一系列job中的一个,可以在中间结果和中间job的output中都使用SequenceFile来保存中间结果,即使最后的输出需要使用文本 化的数据。这样可以减少数据的中间结果,减低磁盘的 IO,并且节省网络传输。

测试对比:

对 于example中的word count job,我们可以修改value中的数据类型从IntWritable为Text,在reduce中,用Integer.parseString(value.toString()) 来接收value信息。job的性能直接就降低了10%,整个job花了9分钟,并且每一个map都花费了36秒钟的时间,原先只需要33秒。由于 Integer的parse本身比较快,这样做并没有太大程度上的影响性能,而在通常情况下,不同的使用方式会产生2-3倍的性能差。

4. hadoop的mapreduce作业通常有如下描述时,会不够高效,如下:

  • 在tasktracker进程中加入如下配置:-verbose:gc -XX:+PrintGCDetails,然后tasktrakcer就会在运行过程中将GC的detail信息打印到log中,根据log观察,如果能 够看到jvm花了很多的时间来做垃圾回收,那么通常,程序中new 了太多的不必要的对象。
  • 在自己的源代码中grep一下“new Text”或者“new IntWritable”,如果在某些循环中找到这样的代码,或者mapper或者reducer中找到这样的代码,通常代码就不够高效。
  • 以 上tips在tasks被限制在内存中时通常都非常有效。

对于mapredcue的新手来说,常常会犯这样的一个错误,在自己的程序 中大量的new Writable对象。尤其是在mapper函数或者reducer函数中。例如,有些人可能会这样来实现wordcunt:

public void map(...) {
...
for (String word : words) {
output.collect(new Text(word), new IntWritable(1));
}
}

这种实现会造成:非常非常多的临时对象被分配内存。
这样会造成jvm花费很多的时间来进行垃圾回收。正确的做法是:
class MyMapper ... {
 Text wordText = new Text();
IntWritable ne = new IntWritable(1);
public void map(...) {
...
for (String word : words) {
wordText.set(word);
output.collect(word, one);
}
}
}
这就是说,在mapreduce中,Writable对象是可重用的,其实这也是hadoop中为什么会使用
writable对象(如Text,IntWritable等),而不是采用原声的 对应对象的原因。

测 试对比:

当把wordcount的代码修改成以上的样子,job的运行时间并没有太大的改变,但是这是因为在配置中默认 设定了每个task的heap size的最大为1GB,所以GC在jvm的内存占用没有到一定阈值的时候根本就没有运行。但是,如果将heap size修改为200MB,那么原本8分30秒能完成的作业,变成了17分钟才能完成。所以,在hadoop的mapreduce代码书写的时候,注意这 样的一些小,是非常有用的。

阅读(676) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~