全部博文(436)
分类: 云计算
2011-09-24 12:56:32
mapreduce程序设计
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
/*org.apache.hadoop.mapreduce.lib. 取代org.apache.hadoop.mapred.xxx,这里的改变让程序员修改代码时会更加的方便,比原来能够少写很多代码
the old API++++++++++++++++++++++++++++++++++++++++++++++++++
public static class MapClass extends MapReduceBase
implements Mapper
public void map(K1 key, V1 value,
OutputCollector
Reporter reporter) throws IOException { }
}
public static class Reduce extends MapReduceBase
implements Reducer
public void reduce(K2 key, Iterator
OutputCollector
Reporter reporter) throws IOException { }
}
The new API ++++++++++++++++++++++++++++++++++++++++++++++++
public static class MapClass extends Mapper
public void map(K1 key, V1 value, Context context)
throws IOException, InterruptedException { }
}
public static class Reduce extends Reducer
public void reduce(K2 key, Iterable
throws IOException, InterruptedException { }
}
*/
import org.apache.hadoop.util.ToolRunner;
public class tt extends Configured implements Tool {
public static class MapClass
extends Mapper
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] citation = value.toString().split(",");//split的作用是将该字符串里面的变量赋值给citation这个字符串数组当中。
context.write(new Text(citation[1]), new Text(citation[0])); //使用新的API取代了collect相关的API,将map中的key和value进行了互换。
}
}
public static class Reduce extends Reducer
public void reduce(Text key, Iterable
Context context)
throws IOException, InterruptedException {
String csv ="";
for (Text val:values) {//Text类型是类似于String类型的文本格式,但是在处理编码上还是和String有差别,与内存序列化有关,是hadoop经过封装之后的新类。
if (csv.length() > 0) csv += ",";
csv += val.toString();
}
context.write(key, new Text(csv));
}
}
public int run(String[] args) throws Exception { //由hadoop本身调用该程序
Configuration conf = getConf();
Job job = new Job(conf, "tt"); //利用job取代了jobclient
job.setJarByClass(tt.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); //此处如果不进行设置,系统会抛出异常,还要记住新旧API不能混用
System.exit(job.waitForCompletion(true)?0:1);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new tt(), args); //调用新的类的方法免除配置的相关琐碎的细节
System.exit(res);
}
}
上面的代码在eclipse中是可以运行的,但是输入文件是hadoop in action中的文件cite75_99.TXT,
格式如下:
[root@asus input]# head -n 5 cite75_99.txt
"CITING","CITED"
3858241,956203
3858241,1324234
3858241,3398406
3858241,3557384
我写的这个例子开始就是这样报错org.apache.hadoop.io.LongWritable cannot
be cast to org.apache.hadoop.io.Text 然后按照上面的程序修改调用了新的API 就能够有效的将key的类型设置成Text,我用红颜色标记的部分是必须要这样写的 因为设置Text必须要在map reduce 和conf中同时设置才管用。我的邮箱是shenyanxxxy@qq.com 如果有hadoop的兴趣爱好者可以联系我 我们共同来商讨。