Chinaunix首页 | 论坛 | 博客
  • 博客访问: 629342
  • 博文数量: 149
  • 博客积分: 3901
  • 博客等级: 中校
  • 技术积分: 1558
  • 用 户 组: 普通用户
  • 注册时间: 2009-02-16 14:33
文章分类

全部博文(149)

文章存档

2014年(2)

2013年(10)

2012年(32)

2011年(21)

2010年(84)

分类: 云计算

2013-01-06 20:00:27


直接上代码 :

点击(此处)折叠或打开

  1. package ccindex.mapreduce;

  2. import java.io.IOException;

  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.BytesWritable;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  15. public class TestMR {
  16.     // map key => input
  17.     public static class MMap extends
  18.             Mapper<BytesWritable , Text, Text, IntWritable> {
  19.         private Text key = new Text();
  20.         private IntWritable value = new IntWritable(1);
  21.         
  22.         public void map(BytesWritable ikey, Text row, Context context)
  23.                 throws IOException, InterruptedException {
  24.             key.set(row.toString().split("\001")[0]);
  25.             context.write(key, value);    
  26.         }
  27.         
  28.     }

  29.     public static class RReduce extends
  30.             Reducer<Text, IntWritable, Text, IntWritable> {
  31.         
  32.         private IntWritable value = new IntWritable(0);
  33.         
  34.         public void reduce(Text ikey, Iterable<IntWritable> values,
  35.                 Context context) throws IOException, InterruptedException {
  36.             context.write(ikey, value);
  37.         }
  38.     }
  39.     
  40.     public static void main(String[] args) throws Exception {
  41.         Configuration config = new Configuration(true);
  42.         config.set("mapred.job.queue.name", "hadmin");

  43.         config.set("mapred.output.compression.type", "BLOCK");

  44.         Job job = new Job(config);
  45.         
  46.         job.setInputFormatClass(SequenceFileInputFormat.class);
  47.         
  48.         String input = args[0], output = args[1]+"/1/";
  49.         //job.setNumReduceTasks
  50.         
  51.         // map -> reducr 序列化 对象
  52.         job.setMapOutputKeyClass(Text.class);
  53.         job.setMapOutputValueClass(IntWritable.class);

  54.         // reduce -> HDFS 序列化 对象
  55.         job.setOutputKeyClass(Text.class);
  56.         job.setOutputValueClass(IntWritable.class);
  57.         
  58.         job.setMapperClass(MMap.class);
  59.         job.setReducerClass(RReduce.class);
  60.         
  61.         job.setNumReduceTasks(1);
  62.         
  63.         job.setJarByClass(TestMR.class);
  64.         
  65.         FileInputFormat.setInputPaths(job, new Path(input));

  66.         FileOutputFormat.setOutputPath(job, new Path(output));

  67.         job.waitForCompletion(true);
  68.     }
  69. }


阅读(1729) | 评论(0) | 转发(0) |
0

上一篇:数据挖掘绩效

下一篇:我有一个梦想

给主人留下些什么吧!~~