Chinaunix首页 | 论坛 | 博客
  • 博客访问: 598106
  • 博文数量: 83
  • 博客积分: 5453
  • 博客等级: 大校
  • 技术积分: 894
  • 用 户 组: 普通用户
  • 注册时间: 2006-06-10 13:51
文章分类

全部博文(83)

文章存档

2015年(2)

2014年(1)

2013年(1)

2012年(3)

2011年(14)

2010年(7)

2009年(7)

2008年(12)

2007年(17)

2006年(19)

我的朋友

分类: 服务器与存储

2011-09-27 13:30:11

本次记录Hadoop的MapReduce的java编程方法

MapReduce

MapReduce本质上是Devide and parallel conquer

执行过程见google mapreduce论文3.1节。

输入数据集的划分是分布式文件系统的事情。

MapReduce处理的是并行任务的调度和执行流程。

开发者负责数据集的处理算法+结果的归并算法。

结果数据集的存储还是分布式文件系统的事情。

mapreduce 

上面的图有点混淆,好像MapReduce是作为一个平台运行的,实际情况并不是这样。

目前MapReduce的实现为lib,本质是一种框架,用户程序必须包含MapReduce和文件系统的lib。

Hadoop的MapReduce的实现也是lib,这适合Java语言本身的特点,可以想象执行过程一层一层的调用下去。

 

WordCount

先学习一下最简单的代码WordCount。

在看代码之前,想想自己设计一个在N台机器上实现并行单词计数算法,作为本次学习的出发点。

  1. 输入集划分为N份,{1,2,……,x,……,N}
  2. 每台机器获得1份输入,分别并行计算x中的单词计数
  3. 计算结果归并为一个结果,使用排序或者Hash合并相同的单词,累加各个计数和,得到最终该单词的计数。
  1. package org.apache.hadoop.examples;

  2. import java.io.IOException;
  3. import java.util.StringTokenizer;

  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import org.apache.hadoop.util.GenericOptionsParser;

  14. public class WordCount {

  15.   public static class TokenizerMapper
  16.        extends Mapper<Object, Text, Text, IntWritable>{
  17.     
  18.     private final static IntWritable one = new IntWritable(1);
  19.     private Text word = new Text();
  20.       
  21.     public void map(Object key, Text value, Context context
  22.                     ) throws IOException, InterruptedException {
  23.       StringTokenizer itr = new StringTokenizer(value.toString());
  24.       while (itr.hasMoreTokens()) {
  25.         word.set(itr.nextToken());
  26.         context.write(word, one);
  27.       }
  28.     }
  29.   }
  30.   
  31.   public static class IntSumReducer
  32.        extends Reducer<Text,IntWritable,Text,IntWritable> {
  33.     private IntWritable result = new IntWritable();

  34.     public void reduce(Text key, Iterable<IntWritable> values,
  35.                        Context context
  36.                        ) throws IOException, InterruptedException {
  37.       int sum = 0;
  38.       for (IntWritable val : values) {
  39.         sum += val.get();
  40.       }
  41.       result.set(sum);
  42.       context.write(key, result);
  43.     }
  44.   }

  45.   public static void main(String[] args) throws Exception {
  46.     Configuration conf = new Configuration();
  47.     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  48.     if (otherArgs.length != 2) {
  49.       System.err.println("Usage: wordcount ");
  50.       System.exit(2);
  51.     }
  52.     Job job = new Job(conf, "word count");
  53.     job.setJarByClass(WordCount.class);
  54.     job.setMapperClass(TokenizerMapper.class);
  55.     job.setCombinerClass(IntSumReducer.class);
  56.     job.setReducerClass(IntSumReducer.class);
  57.     job.setOutputKeyClass(Text.class);
  58.     job.setOutputValueClass(IntWritable.class);
  59.     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  60.     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  61.     System.exit(job.waitForCompletion(true) ? 0 : 1);
  62.   }
  63. }

Mapper

Class Mapper

实现从处理初始输入得到中间结果

  1.   public static class TokenizerMapper
  2.        extends Mapper<Object, Text, Text, IntWritable>{
  3.     
  4.     private final static IntWritable one = new IntWritable(1);
  5.     private Text word = new Text();
  6.       
  7.     public void map(Object key, Text value, Context context
  8.                     ) throws IOException, InterruptedException {
  9.       StringTokenizer itr = new StringTokenizer(value.toString());
  10.       while (itr.hasMoreTokens()) {
  11.         word.set(itr.nextToken());
  12.         context.write(word, one);
  13.       }
  14.     }
  15.   }

为input,为output。

map()对输入的所有文本中的单词,以单词为key,该单词的计数为value,生成中间结果,每个单词计数是1。

从代码上看,输入的Object key没有用到。

输出的Text word不是常规理解的唯一key,因为算法没有合并相同的单词,word可能有重复。

 

Reducer

Class Reducer

合并中间结果到最终结果

这个过程分为3个阶段:

  1. 获取Mapper的中间结果
  2. 将中间结果中的values按key划分group,而group按照key排序。形成了的结构,此时key是唯一的。
  3. 处理group中的所有values,得到一个value。此时key对应的value唯一,序对形成。
  1. public static class IntSumReducer
  2.        extends Reducer<Text,IntWritable,Text,IntWritable> {
  3.     private IntWritable result = new IntWritable();

  4.     public void reduce(Text key, Iterable<IntWritable> values,
  5.                        Context context
  6.                        ) throws IOException, InterruptedException {
  7.       int sum = 0;
  8.       for (IntWritable val : values) {
  9.         sum += val.get();
  10.       }
  11.       result.set(sum);
  12.       context.write(key, result);
  13.     }
  14.   }
为input,为outpu。reduce()的输入为第二阶段的,也就是以单词为key,后面跟上x个1的集合,只要数出x。这个时候回头看看Mapper,就不难理解为什么不在Mapper里合并中间结果子集了?答案是合并是reduce的事情,第二阶段包含了中间结果子集。reduce的第二阶段和第三阶段都是合并算法,因此有人也称该过程为merger。  

本次结语

MapReduce提供了简单高效的多节点并行处理框架,简化了开发者的工作,开发者仅仅关注处理算法Map+归并算法Reduce。

只需要简单的学习一些Hadoop的lib和api,开发者即可进行并行算法的开发,无需关注存储的细节和进程调度。

下一节,将关注MapReduce的输入集和输出集的数据分布。

阅读(1612) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~