Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1108930
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 大数据

2018-07-06 17:10:50


点击(此处)折叠或打开

  1. package com.hdp.design;

  2. import org.apache.hadoop.fs.FileSystem;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.MapWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.io.Writable;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  13. import java.io.IOException;
  14. import java.util.*;

  15. public class MyStripesJob {

  16.     /*
  17.     输入数据为用户购买的商品ID列表,具体如下
  18.     2,3,1,4,5,2,3
  19.     1,2,5,2
  20.     4,5
  21.     1,3,4,1
  22.     3,1
  23.     4,2,1,5,5,3
  24.     map input:2,3,1,4,5,2,3
  25.     1,[2,2]
  26.     1,[3,2]
  27.     1,[4,1]
  28.     1,[5,1]
  29.     2,[3,2]
  30.     2,[4,1]
  31.     2,[5,1]
  32.     2,[3,2]
  33.     2,[4,1]
  34.     2,[5,1]
  35.     3,[4,1]
  36.     3,[5,1]
  37.     3,[4,1]
  38.     3,[5,1]
  39.     4,[5,1]
  40.     map input:1,2,5,2
  41.     1,[2,2]
  42.     1,[5,1]
  43.     2,[5,1]
  44.     2,[5,1]
  45.     map input:4,5
  46.     4,[5,1]
  47.     map input:1,3,4,1
  48.     1,[3,1]
  49.     1,[4,1]
  50.     1,[3,1]
  51.     1,[4,1]
  52.     3,[4,1]
  53.     map input:3,1
  54.     1,[3,1]
  55.     map input:4,2,1,5,5,3
  56.     1,[2,1]
  57.     1,[3,1]
  58.     1,[4,1]
  59.     1,[5,2]
  60.     2,[3,1]
  61.     2,[4,1]
  62.     2,[5,2]
  63.     3,[4,1]
  64.     3,[5,2]
  65.     4,[5,2]
  66.      */
  67.     public static class MyStripesMapper extends Mapper<Object, Text, Text, MapWritable>{
  68.         @Override
  69.         protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  70.             String []arrays = value.toString().split(",");
  71.             System.out.println("map input:"+ value.toString());

  72.             //对输入数据排序,避免将同时购买商品[1,2]和[2,1]分别计数
  73.             List<String> list = Arrays.asList(arrays);
  74.             Collections.sort(list);
  75.             for (int i=0;i<list.size();i++){
  76.                 MapWritable mapWritable = new MapWritable();
  77.                 String curPrdId = list.get(i);
  78.                 for (int j=i+1;j<list.size();j++){
  79.                     String nextPrdId = list.get(j);
  80.                     Text prdIdKey = new Text(nextPrdId);
  81.                     if(curPrdId.equals(nextPrdId)){//过滤相同的商品ID
  82.                         continue;
  83.                     }

  84.                     //对购买商品ID次数累计
  85.                     if(mapWritable.get(prdIdKey) ==null){
  86.                         mapWritable.put(prdIdKey, new IntWritable(1));
  87.                     }else {
  88.                         IntWritable cnt = (IntWritable) mapWritable.get(prdIdKey);
  89.                         cnt.set(cnt.get()+1);
  90.                         mapWritable.put(prdIdKey,cnt);
  91.                     }
  92.                 }

  93.                 if(mapWritable.size()>0){
  94.                     System.out.println("map out:");
  95.                     for (MapWritable.Entry tmpMap:mapWritable.entrySet()){
  96.                         System.out.println(curPrdId+",["+tmpMap.getKey().toString() +","+ tmpMap.getValue()+"]");
  97.                     }
  98.                     context.write(new Text(curPrdId), mapWritable);
  99.                 }
  100.             }
  101.         }
  102.     }

  103.     /*
  104.     reduce 输出
  105.     [1,2]    5
  106.     [1,3]    6
  107.     [1,4]    4
  108.     [1,5]    4
  109.     [2,3]    5
  110.     [2,4]    3
  111.     [2,5]    6
  112.     [3,4]    4
  113.     [3,5]    4
  114.     [4,5]    4
  115.     *
  116.     * */
  117.     public static class MyStripesReduce extends Reducer<Text, MapWritable, Text,Text>{
  118.         @Override
  119.         protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException {

  120.             System.out.println("reduce input:"+key+","+ values.toString());
  121.             HashMap<String, Integer> map = new HashMap<String, Integer>();
  122.              for (MapWritable value: values){
  123.                  for (Writable mapKey:value.keySet()){
  124.                     String strKey = mapKey.toString();
  125.                      //对收到的数据在次进行累加
  126.                      int num = Integer.parseInt(value.get(mapKey).toString());
  127.                     if(map.get(strKey)==null){
  128.                         map.put(strKey,num);
  129.                      }else {
  130.                         map.put(strKey, (Integer)map.get(strKey)+num);
  131.                      }
  132.                  }
  133.              }

  134.              for (Map.Entry<String, Integer> tmpMap : map.entrySet()){
  135.                  String outKey = "["+key.toString()+","+tmpMap.getKey()+"]";
  136.                  System.out.println("reduce out:"+ outKey+ ","+ String.valueOf(tmpMap.getValue()));
  137.                  context.write(new Text(outKey), new Text( String.valueOf(tmpMap.getValue())));
  138.              }

  139.         }
  140.     }

  141.     public static void main(String []args){

  142.         try {
  143.             Job job = Job.getInstance();
  144.             job.setJobName("MyStripesJob");
  145.             job.setJarByClass(MyStripesJob.class);
  146.             job.setMapperClass(MyStripesMapper.class);
  147.             job.setMapOutputKeyClass(Text.class);
  148.             job.setMapOutputValueClass(MapWritable.class);

  149.             job.setReducerClass(MyStripesReduce.class);
  150.             job.setOutputValueClass(Text.class);
  151.             job.setOutputKeyClass(Text.class);

  152.             job.setNumReduceTasks(1);

  153.             FileInputFormat.setInputPaths(job, new Path(args[0]));
  154.             FileOutputFormat.setOutputPath(job, new Path(args[1]));
  155.             FileSystem.get(job.getConfiguration()).delete(new Path(args[1]), true);

  156.             System.out.println(job.waitForCompletion(true));
  157.         } catch (IOException e) {
  158.             e.printStackTrace();
  159.         } catch (InterruptedException e) {
  160.             e.printStackTrace();
  161.         } catch (ClassNotFoundException e) {
  162.             e.printStackTrace();
  163.         }

  164.     }
  165. }


阅读(1310) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~