Chinaunix首页 | 论坛 | 博客
  • 博客访问: 2214261
  • 博文数量: 436
  • 博客积分: 9833
  • 博客等级: 中将
  • 技术积分: 5558
  • 用 户 组: 普通用户
  • 注册时间: 2010-09-29 10:27
文章存档

2013年(47)

2012年(79)

2011年(192)

2010年(118)

分类: 云计算

2011-09-24 12:56:32

mapreduce程序设计

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;                 

/*org.apache.hadoop.mapreduce.lib. 取代org.apache.hadoop.mapred.xxx,这里的改变让程序员修改代码时会更加的方便,比原来能够少写很多代码

the old API++++++++++++++++++++++++++++++++++++++++++++++++++

public static class MapClass extends MapReduceBase
implements Mapper {
public void map(K1 key, V1 value,
OutputCollector output,
Reporter reporter) throws IOException { }
}
public static class Reduce extends MapReduceBase
implements Reducer {
public void reduce(K2 key, Iterator values,
OutputCollector output,
Reporter reporter) throws IOException { }
}

The new API ++++++++++++++++++++++++++++++++++++++++++++++++
public static class MapClass extends Mapper {
public void map(K1 key, V1 value, Context context)
throws IOException, InterruptedException { }
}

public static class Reduce extends Reducer {
public void reduce(K2 key, Iterable values, Context context)
throws IOException, InterruptedException { }
}

*/
import org.apache.hadoop.util.ToolRunner;

public class tt extends Configured implements Tool {
public static class MapClass
extends Mapper {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] citation = value.toString().split(",");//split的作用是将该字符串里面的变量赋值给citation这个字符串数组当中。
context.write(new Text(citation[1]), new Text(citation[0]));  //使用新的API取代了collect相关的API,将map中的key和value进行了互换。
}
}
public static class Reduce extends Reducer {  //前两个参数设置是输入参数,后两个参数是输出参数。
public void reduce(Text key, Iterable values,
Context context)
throws IOException, InterruptedException {
String csv ="";

for (Text val:values) {//Text类型是类似于String类型的文本格式,但是在处理编码上还是和String有差别,与内存序列化有关,是hadoop经过封装之后的新类。
if (csv.length() > 0) csv += ",";
csv += val.toString();
}

context.write(key, new Text(csv));
}
}
public int run(String[] args) throws Exception {  //由hadoop本身调用该程序
Configuration conf = getConf();
Job job = new Job(conf, "tt"); //利用job取代了jobclient
job.setJarByClass(tt.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);  //此处如果不进行设置,系统会抛出异常,还要记住新旧API不能混用
System.exit(job.waitForCompletion(true)?0:1);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new tt(), args);    //调用新的类的方法免除配置的相关琐碎的细节
System.exit(res);
}
}

上面的代码在eclipse中是可以运行的,但是输入文件是hadoop in action中的文件cite75_99.TXT,

格式如下:

[root@asus input]# head -n 5 cite75_99.txt 
"CITING","CITED"
3858241,956203
3858241,1324234
3858241,3398406
3858241,3557384

我写的这个例子开始就是这样报错org.apache.hadoop.io.LongWritable cannot 
be cast to org.apache.hadoop.io.Text 然后按照上面的程序修改调用了新的API 就能够有效的将key的类型设置成Text,我用红颜色标记的部分是必须要这样写的 因为设置Text必须要在map reduce 和conf中同时设置才管用。我的邮箱是shenyanxxxy@qq.com 如果有hadoop的兴趣爱好者可以联系我 我们共同来商讨。


阅读(2001) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~