Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1088315
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 云计算

2018-06-25 20:13:56

 TopN算法是一个经典的算法,由于每个map都只是实现了本地的TopN算法,而假设map有M个,在归约的阶段只有M x N个,这个结果是可以接受的并不会造成性能瓶颈。
这个TopN算法在map阶段将使用TreeMap来实现排序,以到达可伸缩的目的。
实现步骤:
  1、在mapper中定义TreeMap 将输入数据按put到TreeMap中,如果TreeMap的size大于N,需要移除最小的数据
  2、在以往的mapper中,我们都是处理一条数据之后就context.write或者output.collector一次。而在这里是把所有数据处理完之后再进行写入。所以,我们可以把这个context.write放在cleanup里执行。
    cleanup就是整个mapper task执行完之后会执行的一个函数。
  3、在reduce中定义TreeMap 将输入数据按put到TreeMap中,对Mapper输出的数据在进行一次汇总,选出其中的Top K

具体实现如下:

点击(此处)折叠或打开

  1. import org.apache.hadoop.fs.Path;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.NullWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. import org.apache.hadoop.mapreduce.Reducer;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. import org.apache.hadoop.util.GenericOptionsParser;

  11. import java.io.IOException;
  12. import java.util.Comparator;
  13. import java.util.TreeMap;

  14. public class TopTenJob {
  15.     public static class TopTenMapper extends Mapper<LongWritable ,Text, NullWritable, Text>{
  16.         private static TreeMap<Long, String> map = new TreeMap<Long,String >();
  17.         @Override
  18.         public void map(LongWritable key, Text value, Context context){

  19.             String strValue = value.toString();
  20.             String []arrays = strValue.split("\t");

  21.             map.put(Long.parseLong(arrays[1]), strValue);
  22.             if(map.size()>10){
  23.                 map.remove(map.firstKey());
  24.             }
  25.         }

  26.         @Override
  27.         protected void cleanup(Context context
  28.         ) throws IOException, InterruptedException {
  29.           for (String str: map.values())
  30.               context.write(NullWritable.get(), new Text(str));
  31.         }
  32.     }


  33.     public static class TopTenReduce extends Reducer<NullWritable, Text, NullWritable, Text>{
  34.         private static TreeMap<Long, String> topTenMap = new TreeMap<>(new Comparator() {
  35.             @Override
  36.             public int compare(Object o1, Object o2) {
  37.                 Long lo1 = (Long)o1;
  38.                 Long lo2 = (Long)o2;
  39.                 return lo2.compareTo(lo1);
  40.             }
  41.         });
  42.         @Override
  43.         public void reduce (NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  44.             for(Text text:values){
  45.                 String value = text.toString();
  46.                 String [] arrays = value.split("\t");

  47.                 topTenMap.put(Long.parseLong(arrays[1]),value);
  48.                 if(topTenMap.size()>10){
  49.                     topTenMap.remove(topTenMap.firstKey() );
  50.                 }
  51.             }

  52.             for (String strVal : topTenMap.values()){
  53.                 context.write(NullWritable.get(), new Text(strVal));
  54.             }
  55.         }

  56.     }

  57.     public static void main(String []args){

  58.         Job job = null;
  59.         try {
  60.             job = Job.getInstance();
  61.             job.setJobName("TopTenJob");
  62.             job.setJarByClass(TopTenJob.class);

  63.             job.setMapperClass(TopTenMapper.class);
  64.             job.setMapOutputKeyClass(NullWritable.class);
  65.             job.setOutputValueClass(Text.class);

  66.             job.setReducerClass(TopTenReduce.class);
  67.             job.setOutputKeyClass(NullWritable.class);
  68.             job.setOutputValueClass(Text.class);

  69.             String []arrays = new GenericOptionsParser(args).getRemainingArgs();
  70.             FileInputFormat.addInputPath(job, new Path(arrays[0]));
  71.             FileOutputFormat.setOutputPath(job, new Path(arrays[1]));

  72.             job.setNumReduceTasks(1);

  73.             System.out.println(job.waitForCompletion(true));
  74.         } catch (IOException e) {
  75.             e.printStackTrace();
  76.         } catch (InterruptedException e) {
  77.             e.printStackTrace();
  78.         } catch (ClassNotFoundException e) {
  79.             e.printStackTrace();
  80.         }

  81.     }
  82. }


阅读(1224) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~