本次记录Hadoop的MapReduce的java编程方法
MapReduce MapReduce本质上是Devide and parallel conquer
执行过程见google mapreduce论文3.1节。
输入数据集的划分是分布式文件系统的事情。
MapReduce处理的是并行任务的调度和执行流程。
开发者负责数据集的处理算法+结果的归并算法。
结果数据集的存储还是分布式文件系统的事情。
上面的图有点混淆,好像MapReduce是作为一个平台运行的,实际情况并不是这样。
目前MapReduce的实现为lib,本质是一种框架,用户程序必须包含MapReduce和文件系统的lib。
Hadoop的MapReduce的实现也是lib,这适合Java语言本身的特点,可以想象执行过程一层一层的调用下去。
WordCount 先学习一下最简单的代码WordCount。
在看代码之前,想想自己设计一个在N台机器上实现并行单词计数算法,作为本次学习的出发点。
- 输入集划分为N份,{1,2,……,x,……,N}
- 每台机器获得1份输入,分别并行计算x中的单词计数
- 计算结果归并为一个结果,使用排序或者Hash合并相同的单词,累加各个计数和,得到最终该单词的计数。
- package org.apache.hadoop.examples;
-
-
import java.io.IOException;
-
import java.util.StringTokenizer;
-
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.apache.hadoop.util.GenericOptionsParser;
-
-
public class WordCount {
-
-
public static class TokenizerMapper
-
extends Mapper<Object, Text, Text, IntWritable>{
-
-
private final static IntWritable one = new IntWritable(1);
-
private Text word = new Text();
-
-
public void map(Object key, Text value, Context context
-
) throws IOException, InterruptedException {
-
StringTokenizer itr = new StringTokenizer(value.toString());
-
while (itr.hasMoreTokens()) {
-
word.set(itr.nextToken());
-
context.write(word, one);
-
}
-
}
-
}
-
-
public static class IntSumReducer
-
extends Reducer<Text,IntWritable,Text,IntWritable> {
-
private IntWritable result = new IntWritable();
-
-
public void reduce(Text key, Iterable<IntWritable> values,
-
Context context
-
) throws IOException, InterruptedException {
-
int sum = 0;
-
for (IntWritable val : values) {
-
sum += val.get();
-
}
-
result.set(sum);
-
context.write(key, result);
-
}
-
}
-
-
public static void main(String[] args) throws Exception {
-
Configuration conf = new Configuration();
-
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-
if (otherArgs.length != 2) {
-
System.err.println("Usage: wordcount ");
-
System.exit(2);
-
}
-
Job job = new Job(conf, "word count");
-
job.setJarByClass(WordCount.class);
-
job.setMapperClass(TokenizerMapper.class);
-
job.setCombinerClass(IntSumReducer.class);
-
job.setReducerClass(IntSumReducer.class);
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(IntWritable.class);
-
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
-
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
-
System.exit(job.waitForCompletion(true) ? 0 : 1);
-
}
-
}
Mapper
Class Mapper
实现从处理初始输入得到中间结果。
- public static class TokenizerMapper
-
extends Mapper<Object, Text, Text, IntWritable>{
-
-
private final static IntWritable one = new IntWritable(1);
-
private Text word = new Text();
-
-
public void map(Object key, Text value, Context context
-
) throws IOException, InterruptedException {
-
StringTokenizer itr = new StringTokenizer(value.toString());
-
while (itr.hasMoreTokens()) {
-
word.set(itr.nextToken());
-
context.write(word, one);
-
}
-
}
-
}
map()对输入的所有文本中的单词,以单词为key,该单词的计数为value,生成中间结果,每个单词计数是1。
从代码上看,输入的Object key没有用到。
输出的Text word不是常规理解的唯一key,因为算法没有合并相同的单词,word可能有重复。
Reducer
Class Reducer
合并中间结果到最终结果。
这个过程分为3个阶段:
- 获取Mapper的中间结果
- 将中间结果中的values按key划分group,而group按照key排序。形成了的结构,此时key是唯一的。
- 处理group中的所有values,得到一个value。此时key对应的value唯一,序对形成。
- public static class IntSumReducer
-
extends Reducer<Text,IntWritable,Text,IntWritable> {
-
private IntWritable result = new IntWritable();
-
-
public void reduce(Text key, Iterable<IntWritable> values,
-
Context context
-
) throws IOException, InterruptedException {
-
int sum = 0;
-
for (IntWritable val : values) {
-
sum += val.get();
-
}
-
result.set(sum);
-
context.write(key, result);
-
}
-
}
为input,为outpu。reduce()的输入为第二阶段的,也就是以单词为key,后面跟上x个1的集合,只要数出x。这个时候回头看看Mapper,就不难理解为什么不在Mapper里合并中间结果子集了?答案是合并是reduce的事情,第二阶段包含了中间结果子集。reduce的第二阶段和第三阶段都是合并算法,因此有人也称该过程为merger。
本次结语
MapReduce提供了简单高效的多节点并行处理框架,简化了开发者的工作,开发者仅仅关注处理算法Map+归并算法Reduce。
只需要简单的学习一些Hadoop的lib和api,开发者即可进行并行算法的开发,无需关注存储的细节和进程调度。
下一节,将关注MapReduce的输入集和输出集的数据分布。
阅读(1612) | 评论(0) | 转发(0) |