Chinaunix首页 | 论坛 | 博客
  • 博客访问: 132359
  • 博文数量: 69
  • 博客积分: 2895
  • 博客等级: 少校
  • 技术积分: 710
  • 用 户 组: 普通用户
  • 注册时间: 2010-09-03 18:05
文章分类

全部博文(69)

文章存档

2010年(69)

我的朋友

分类:

2010-09-09 13:40:50

hadoop中的map/reduce编程中有几个非常关键的组件,其中包括 Mapper,Reducer,InputFormat,OutputFormat,OutputKeyClass,OutputValueClass 等,在刚接触map/reduce编程的时候很容易由于 InputFormat,OutputFormat,OutputKeyClass,OutputValueClass在程序中的不正确导致程序出错,有时候试来试去没准能让程序跑的通 过,但是结果不一定如预期的那样,即使结果也如预期,但是也没有明白hadoop 的mapred框架内部到底是如何处理的细节,等下次编程的时候还是会面临相同的问题,所以干脆一下狠心,开始研究hadoop的源码了解其实现细节。做如下:

程序出错描述 
以一个mapreduce运行为例:
比如运行:
bin/hadoop jar hadoop-0.19.0-streaming.jar -input /home/luoli/input/goodman -output /home/luoli/output -mapper org.apache.hadoop.mapred.lib.IdentityMapper -reducer /usr/bin/wc
这其实是一个很简单的mapreduce运行instance,就是把一个读入到mapper中,mapper不做任何计算就把中 间结果传递给reducer,reducer对中间结果进行行,字和字节的统计,最后将统计信息输出到HDFS相应的目录下。但仅仅是这么简单的一个 mapreduce run instance,却提示出错,job运行失败,查看,记录的错误信息如下
.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:548)
    at org.apache.hadoop.mapred.lib.IdentityMapper.map(IdentityMapper.java:37)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
    at org.apache.hadoop.mapred.Child.main(Child.java:155)
这是几乎所有人在编写hadoop mapreduce程序时会遇到的最常见的错误,程序提示说是格式错误,出错的地方在Mapper从InputFormat中读取input并产生中间结 果时发生了格式错误,Mapper在collect时期待Key的类型是Text,而收到的确实LongWritable类型的key。
究其原 因,主要是因为,程序中没有明确指定要使用那种InputFormat来读取输入,hadoop就使用默认的TextInputFormat,通过查看源码,这个TextInputFormat是 扩展自FileInputFormat,
public class TextInputFormat extends FileInputFormat{...}
并且可以看出父类的范型中已经明确指定了 keyClass为LongWritable类型,而value Class为Text类型,并且这个InputFormat的实现为将输入文件的行数作为key,而将每一行的文本作为value。因此,在从文件到 mapper的过程中,数据就被分割成了的对,并传输给Mapper,这也就是说Mapper需要以同样的格式来接收来自InputFormat的数据。程序到这个时候都是没有 问题的。这里使用的Mapper实例是org.apache.hadoop.mapred.lib.IdentityMapper,这个Mapper的实 现是将读到的数据不做任何的处理,直接传给Reducer,问题在于:从IdentiryMapper类的实现代码来看,如下:
public void map(K key, V val,
                  OutputCollector output, Reporter reporter)
    throws IOException {
    output.collect(key, val);
  }
实际上它是按照接收到的对,并接受key 和value原本的类型类进行数据传递。因此Mapper进行分割的时候,要启动很多的Map的Task来分布的运行,到了MapTask后,会调用 MapTask的内部类MapOutputBuffer.collet()方法,问题就处在了这个方法里面。
从这个方法的代码来看:
public synchronized void collect(K key, V value)
        throws IOException {...}
key和value接收来以后仍然是保持了key和value原本的类型,这时仍然没有问题,但是方法内部,会判断key和 value的类型,是不是等于MapOutputBuffer中保存的keyClass和valueClass类型:
if (key.getClass() != keyClass) {
        throw new IOException("Type mismatch in key from map: expected "
                              + keyClass.getName() + ", recieved "
                              + key.getClass().getName());
      }
      if (value.getClass() != valClass) {
        throw new IOException("Type mismatch in value from map: expected "
                              + valClass.getName() + ", recieved "
                              + value.getClass().getName());
      }
而MapOutputBuffer保存的keyClass和 valueClass类型是在MapOutputBuffer初始化的时候被赋予的:
keyClass = (Class)job.getMapOutputKeyClass();
valClass = (Class)job.getMapOutputValueClass();
所以可以看出,实际上keyClass和 valueClass的class类型实际上是由这个job的JobConf来决定的,继续跟进这两个方法,可以发现,实际上 job.getMapOutputKeyClass()的代码实现为:
public Class getMapOutputKeyClass() {
    Class retv = getClass("mapred.mapoutput.key.class", null, Object.class);
    if (retv == null) {
      retv = getOutputKeyClass();
    }
    return retv;
  }
也就是说,keyClass的类型首先会读取mapred.mapoutput.key.class对应 的value值,这个类型可以在hadoop-site.xml中指定,可以在JobConf设置中在代码中指定,通常是后者,如果没有设置这个值(就是 上例的情况),那么代码中retv就会返回null,此时的代码逻辑是:将keyClass设置成JobConf的 getOutputKeyClass()的返回值,也就是在代码中设置的setOutputKeyClass()中指定的类型,所以如果程序出现以上这种 错误,在jobConf中指定OutputKeyClass就是一种的方法。回到这里,getOutputKeyClass()的代码实现为:
public Class getOutputKeyClass() {
    return getClass("mapred.output.key.class",
                    LongWritable.class, Object.class);
  }
从程序中就可以看出,它会采用 mapred.output.key.class指定的类类型为keyClass,如果仍然没有指定,就最终使用默认的LongWritable类型,这 就是上述程序出错的所在了。因为所有的判断都没有得到,所以最终使用了LongWritable
类型作为keyClass,所以就导致了 MapOutputBuffer在读取InputFormat中过来的数据,并准备向Reducer发送时,由于Reducer使用的是/usr/bin /wc程序,所以它期待的是文本的读入和输出,所以此时发生了类型不匹配的错误。

解决之道: 
在bin/hadoop jar hadoop-0.19.0-streaming.jar -input /home/luoli/input/goodman -output /home/luoli/output -mapper org.apache.hadoop.mapred.lib.IdentityMapper -reducer /usr/bin/wc命令后加上-jobconf 设置,并指定map的输出时准备的key类型是LongWritable类型的。-jobconf mapred.mapoutput.key.class=org.apache.hadoop.io.LongWritable,或者是-D  mapred.mapoutput.key.class=org.apache.hadoop.io.LongWritable.

总结: 
hadoop的mapreduce程序的数据流分成几个阶段,首 先是从原始输入文件到InputFormat,InputFormat根据它自己的实现和输入文件的格式为Mapper准备数据 (),然后这些数据被Mapper读入,并在map函数中进行处理,同样map中也会输出一定格式的中间结果,这些中 间结果也同样是的对,而且读入和输出的key和value类型不一定要相同。然后Reducer从Mapper的输出 中按照同样的格式读入数据,并在reduce方法中进行处理,最后按照OutputFormat的格式进行输出。这中间的任何一个过程如果出现了格式不匹 配或者无法转型,就会报出上述类似的错误,其实如果仔细查看自己的程序的数据流向中的格式是否匹配,通常就能解决问题了。
阅读(592) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~