Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1108627
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 云计算

2018-06-15 18:41:49

总结:
  1、当reducenum 设置为1时,自定义的分区函数不会被调用,这可以理解
  2、在实现compare 函数时要要注意,两个对象firstKey值不相等是要直接返回,只有两个相等时,才比较secondKey,自己在实现是没有注意到这个问题随便写了,调试半天没有发现问题。
      错误写法如下:
     

点击(此处)折叠或打开

  1.         public int compare(WritableComparable a, WritableComparable b){

  2.             SecondSortKey keyA = (SecondSortKey)a;
  3.             SecondSortKey keyB = (SecondSortKey)b;
  4.             //System.out.println("SecondSortComparator , keyA:<"+keyA.getFirstKey() +","+ keyA.getSecondKey() +"> keyB:<"+keyB.getFirstKey() +","+ keyB.getSecondKey()+">");
  5.             int result = keyA.getFirstKey().compareTo(keyB.getFirstKey());
  6.             if(result!=0){
  7.                 result = keyA.getSecondKey().compareTo(keyB.getSecondKey());
  8.             }
  9.             return result;
  10.         }
  3、写SortComparator和groupComparator函数时,要注意构造函数实现,否则会报空指针异常
        
protected SecondSortComparator(){
  1.             super(SecondSortKey.class,true);
  2.         }
1、自定义组合键

点击(此处)折叠或打开

  1. }

  2.     @Override
  3.     public boolean equals(Object o){
  4.         if(!(o instanceof SecondSortKey)){
  5.            return false;
  6.         }

  7.         SecondSortKey sortKey =(SecondSortKey) o;
  8.         return this.firstKey.equals(sortKey.getFirstKey());
  9.     }

  10.     @Override
  11.     public int hashCode(){
  12.        return this.firstKey.hashCode();
  13.     }
  14.     @Override
  15.     public void write(DataOutput out) throws IOException {
  16.         this.firstKey.write(out);
  17.         this.secondKey.write(out);
  18.     }

  19.     @Override
  20.     public void readFields(DataInput in) throws IOException {
  21.         this.firstKey.readFields(in);
  22.         this.secondKey.readFields(in);
  23.     }
2、自定义分区

点击(此处)折叠或打开

  1. public static class SecondSortPartitioner extends Partitioner<SecondSortKey, IntWritable> {
  2.         @Override
  3.         public int getPartition(SecondSortKey key, IntWritable value, int numPartitions) {
  4.             return (key.getFirstKey().hashCode())%numPartitions;
  5.         }
  6.     }
3、自定义排序

点击(此处)折叠或打开

  1. public static class SecondSortComparator extends WritableComparator {

  2.         protected SecondSortComparator(){
  3.             super(SecondSortKey.class,true);
  4.         }

  5.         @Override
  6.         public int compare(WritableComparable a, WritableComparable b){

  7.             SecondSortKey keyA = (SecondSortKey)a;
  8.             SecondSortKey keyB = (SecondSortKey)b;
  9.             //System.out.println("SecondSortComparator , keyA:<"+keyA.getFirstKey() +","+ keyA.getSecondKey() +"> keyB:<"+keyB.getFirstKey() +","+ keyB.getSecondKey()+">");
  10.             int result = keyA.getFirstKey().compareTo(keyB.getFirstKey());
  11.             if(result!=0){
  12.                return result;
  13.             }else {
  14.                 result = keyA.getSecondKey().compareTo(keyB.getSecondKey());
  15.             }
  16.             return result;
  17.         }
  18.     }
4、自定义分组排序

点击(此处)折叠或打开

  1. public static class SecondSortGroupComparator extends WritableComparator {
  2.         protected SecondSortGroupComparator(){
  3.             super(SecondSortKey.class,true);
  4.         }
  5.         @Override
  6.         public int compare(WritableComparable a, WritableComparable b){
  7.             SecondSortKey keyA = (SecondSortKey) a;
  8.             SecondSortKey keyB = (SecondSortKey) b;
  9.             System.out.println("SecondSortGroupComparator , keyA:<"+keyA.getFirstKey() +","+ keyA.getSecondKey() +"> keyB:<"+keyB.getFirstKey() +","+ keyB.getSecondKey()+">");
  10.             int result = keyA.getFirstKey().compareTo(keyB.getFirstKey());
  11.             if(result!=0){
  12.               return result;
  13.             }else {
  14.                 result = keyA.getSecondKey().compareTo(keyB.getSecondKey());
  15.             }

  16.             return result;
  17.         }
  18.     }
5、实现map

点击(此处)折叠或打开

  1. public static class SecondSortMapper extends Mapper<LongWritable, Text, SecondSortKey, IntWritable>{
  2.         @Override
  3.         protected void map(LongWritable key, Text value,
  4.                            Context context) throws IOException, InterruptedException {
  5.             String strValue = value.toString();
  6.             if(strValue==null||strValue.length()==0){
  7.                 return;
  8.             }

  9.             String []array = strValue.split("\t");
  10.            // System.out.println("map recv data:"+array[0]+" -- "+ array[1]);
  11.             SecondSortKey secondSortKey = new SecondSortKey(new Text(array[0].trim()),new IntWritable(Integer.parseInt(array[1].trim())));
  12.             context.write(secondSortKey, new IntWritable(Integer.parseInt(array[1].trim())));
  13.         }
  14.     }
6、reduce实现

点击(此处)折叠或打开

  1. public static class SecondSortReducer extends Reducer<SecondSortKey, IntWritable, NullWritable, Text>{
  2.          @Override
  3.         protected void reduce(SecondSortKey key,Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  4.             for (IntWritable value: values){
  5.                 String data = key.getFirstKey()+" "+ String.valueOf(value.get());
  6.                 context.write(NullWritable.get(), new Text(data));
  7.             }
  8.         }
  9.     }
7、主程序

点击(此处)折叠或打开

  1. public static void main(String []args){
  2.         try {
  3.             Job job = Job.getInstance();
  4.             job.setJobName("SecondSort");
  5.             job.setJarByClass(SecondSortJob.class);
  6.             String []arrays = new GenericOptionsParser(args).getRemainingArgs();
  7.             FileInputFormat.setInputPaths(job,new Path(arrays[0]));

  8.             FileSystem fs = FileSystem.get(new URI(arrays[1]),job.getConfiguration());
  9.             if(fs.exists(new Path(arrays[1]))){
  10.                 //System.out.println("del:"+ arrays[1]);
  11.                 fs.delete(new Path(arrays[1]),true);
  12.             }
  13.             FileOutputFormat.setOutputPath(job,new Path(arrays[1]));

  14.             job.setMapperClass(SecondSortMapper.class);
  15.             job.setMapOutputKeyClass(SecondSortKey.class);
  16.             job.setMapOutputValueClass(IntWritable.class);

  17.             job.setReducerClass(SecondSortReducer.class);
  18.             job.setOutputKeyClass(NullWritable.class);
  19.             job.setOutputValueClass(Text.class);

  20.             job.setPartitionerClass(SecondSortPartitioner.class);
  21.             job.setSortComparatorClass(SecondSortComparator.class);
  22.             job.setGroupingComparatorClass(SecondSortGroupComparator.class);

  23. // job.setInputFormatClass(FileInputFormat.class);
  24. // job.setOutputFormatClass(FileOutputFormat.class);

  25.             job.setNumReduceTasks(1);
  26.             boolean ret= job.waitForCompletion(true);
  27.             if(ret){
  28.                 System.out.println("succ");
  29.             }else {
  30.                 System.out.println("fail");
  31.             }

  32.         } catch (IOException e) {
  33.             e.printStackTrace();
  34.         } catch (InterruptedException e) {
  35.             e.printStackTrace();
  36.         } catch (ClassNotFoundException e) {
  37.             e.printStackTrace();
  38.         } catch (URISyntaxException e) {
  39.             e.printStackTrace();
  40.         }

  41.     }

输入:
A   5  
B   3  
B   8  
B   5  
A   2  
C   2
C   8  
C   5 

输出:
A   2  
A   5  
B   3  
B   5  
B   8 
C   2
C   5  
C   8 









阅读(786) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~