Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1080468
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 大数据

2018-07-06 17:17:27


点击(此处)折叠或打开

  1. import org.apache.hadoop.fs.FileSystem;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. import org.apache.hadoop.mapreduce.Reducer;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  10. import java.io.IOException;


  11. public class MyPairsJob {
  12.     public static class MyPairsMapper extends Mapper<Object, Text, Text, IntWritable>{
  13.         @Override
  14.         protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  15.             System.out.println("map input:"+ value.toString());
  16.             String []arrays = value.toString().split(",");
  17.             for (int i=0;i<arrays.length;i++){//逐个遍历输入数据
  18.                 String curPrdId = arrays[i];
  19.                 for (int j=i+1;j<arrays.length;j++){//依次遍历后边数据
  20.                     String nextPrdId = arrays[j];
  21.                     if(curPrdId.equals(nextPrdId)){//过滤相同商品ID
  22.                         continue;
  23.                     }

  24.                     String strKey = "["+arrays[i]+","+arrays[j]+"]";//生成key
  25.                     //按商品ID大小比较,避免将[1,2] 和[2,1]这两种相同的情况分发到不同的reduce中
  26.                     if(Integer.parseInt(curPrdId)>Integer.parseInt(nextPrdId)){
  27.                          strKey = "["+arrays[j]+","+arrays[i]+"]";
  28.                     }
  29.                     System.out.println("map out:"+ strKey+", 1");
  30.                     context.write(new Text(strKey), new IntWritable(1));
  31.                 }
  32.             }
  33.         }
  34.     }

  35.     public static class MyPairsReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
  36.         @Override
  37.         protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  38.             int sum=0;
  39.             for (IntWritable val:values){
  40.                 sum=sum+val.get();
  41.             }
  42.             context.write(key,new IntWritable(sum));
  43.         }
  44.     }

  45.     public static void main(String []args){
  46.         try {
  47.             Job job = Job.getInstance();
  48.             job.setJobName("MyPairsJob");
  49.             job.setJarByClass(MyPairsJob.class);
  50.             job.setMapperClass(MyPairsMapper.class);
  51.             job.setMapOutputKeyClass(Text.class);
  52.             job.setMapOutputValueClass(IntWritable.class);

  53.             job.setReducerClass(MyPairsReduce.class);
  54.             job.setOutputKeyClass(Text.class);
  55.             job.setOutputValueClass(IntWritable.class);

  56.             job.setNumReduceTasks(1);

  57.             FileInputFormat.setInputPaths(job, new Path(args[0]));
  58.             FileOutputFormat.setOutputPath(job, new Path(args[1]));

  59.             FileSystem.get(job.getConfiguration()).delete(new Path(args[1]), true);

  60.             System.out.println(job.waitForCompletion(true));
  61.         } catch (IOException e) {
  62.             e.printStackTrace();
  63.         } catch (InterruptedException e) {
  64.             e.printStackTrace();
  65.         } catch (ClassNotFoundException e) {
  66.             e.printStackTrace();
  67.         }

  68.     }
  69. }

阅读(1172) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~