Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1113525
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 云计算

2018-06-19 23:14:11

MapReduce默认输出是在单个reduce中以key排序的,多个reduce输出之间是不排序的,

所谓全排序,就是指多个reduce之间的输出是有序的。

实现全排序有三种方法:

1、  使用一个reduce进行输出

MapReduce 默认是以key排序的,使用一个reduce,就可以实现全排序。

这种方法简单,但不能充分利用集群的计算资源,而且在数据量很大的情况下,很有可能会出现OOM问题。

2、  自定义分区函数实现全排序

MapReduce默认的分区函数是HashPartitioner,其实现的原理是计算map输出key的 hashCode ,然后对Reduce个数求模,这样只要求模结果一样的Key都会发送到同一个Reduce。我们可以根据自己的业务需求,自定义分区函数,将Key按范围划分,将某个范围内的值,划分到一个reduce中,如

·       所有 Key < 10000 的数据都发送到Reduce 0;

·      所有 10000 < Key < 20000 的数据都发送到Reduce 1;

·      其余的Key都发送到Reduce 2;

这就实现了Reduce 0的数据一定全部小于Reduce 1,且Reduce 1的数据全部小于Reduce 2,再加上同一个Reduce里面的数据局部有序,这样就实现了数据的全局有序

   这种方法,虽然能够将数据分散到多个Reduce中,但是,必须手动地找到各个Reduce的分界点,尽量使得分散到每个Reduce的数据量均衡。而且每次修改Reduce的个数时,都得手动去找一次Key的分界点!非常不灵活

3、  使用TotalOrderPartitioner进行全排序

Hadoop内置有个 HashPartitioner 分区实现类,MapReduce默认就是使用它;但Hadoop内置还有个名为 TotalOrderPartitioner 的分区实现类,这个就是解决全排序的问题。也就是根据Key的分界点将不同的Key发送到相应的分区。

而这里用到的分界点是由程序解决的,而不是人工手工分区的

第一种方法,简单,就不实现了。
第二种方法实现,步骤如下:
1、自定义key,实现compareTo
2、自定义partitioner,应该清楚整个Key的索引范围
具体实现如下:

注意点:
1、自定义key是需要定义,默认构造函数,否则在运行是会报错
2、自定义partitioner 时,key和value 类型要和map输出的key和value的类型必须一致

点击(此处)折叠或打开

  1. public class AllSortKey implements WritableComparable<AllSortKey> {
  2.     private Text firstKey = new Text();
  3.     private IntWritable secondKey= new IntWritable();

  4.     public IntWritable getSecondKey() {
  5.         return secondKey;
  6.     }

  7.     public void setSecondKey(IntWritable secondKey) {
  8.         this.secondKey = secondKey;
  9.     }

  10.     public Text getFirstKey() {
  11.         return firstKey;
  12.     }

  13.     public void setFirstKey(Text firstKey) {
  14.         this.firstKey = firstKey;
  15.     }


  16.     public AllSortKey(){

  17.     }
  18.     public AllSortKey(Text firstKey, IntWritable secondKey){
  19.         this.firstKey = firstKey;
  20.         this.secondKey =secondKey;
  21.     }


  22.     @Override
  23.     public int compareTo(AllSortKey o) {
  24.         int ret = this.firstKey.compareTo(o.firstKey);
  25.         if(ret!=0){
  26.             return ret;
  27.         }else {
  28.             ret = this.secondKey.compareTo(o.secondKey);
  29.         }
  30.         return ret;
  31.     }

  32.     @Override
  33.     public void write(DataOutput out) throws IOException {
  34.         this.firstKey.write(out);
  35.         this.secondKey.write(out);
  36.     }

  37.     @Override
  38.     public void readFields(DataInput in) throws IOException {
  39.         this.firstKey.readFields(in);
  40.         this.secondKey.readFields(in);
  41.     }
  42. }

点击(此处)折叠或打开

  1. public class AllSortV2Job {
  2.     public static void main(String []args){
  3.         try {
  4.             Job job = Job.getInstance();
  5.             job.setJobName("AllSortV2Job");
  6.             job.setJarByClass(AllSortV2Job.class);

  7.             String []arrays = new GenericOptionsParser(args).getRemainingArgs();
  8.             FileInputFormat.setInputPaths(job,new Path(arrays[0]));
  9.             FileOutputFormat.setOutputPath(job,new Path(arrays[1]));

  10.             FileSystem fs = FileSystem.get(new URI( arrays[1]),job.getConfiguration());
  11.             if(fs.exists(new Path(arrays[1]))){
  12.                 fs.delete(new Path(arrays[1]),true);
  13.             }

  14.             job.setMapperClass(AllSortV2Mapper.class);
  15.             job.setMapOutputKeyClass(AllSortKey.class);
  16.             job.setMapOutputValueClass(Text.class);

  17.             job.setReducerClass(AllSortV2Reduce.class);
  18.             job.setOutputKeyClass(NullWritable.class);
  19.             job.setOutputValueClass(Text.class);

  20.             job.setPartitionerClass(AllSortV2Partitioner.class);

  21.             job.setNumReduceTasks(2);
  22.             boolean ret = job.waitForCompletion(true);
  23.             System.out.println("ret:"+ret);

  24.         } catch (IOException e) {
  25.             e.printStackTrace();
  26.         } catch (InterruptedException e) {
  27.             e.printStackTrace();
  28.         } catch (ClassNotFoundException e) {
  29.             e.printStackTrace();
  30.         } catch (URISyntaxException e) {
  31.             e.printStackTrace();
  32.         }

  33.     }
  34.     public static class AllSortV2Mapper extends Mapper<LongWritable ,Text , AllSortKey, Text>{
  35.         @Override
  36.         public void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
  37.             String strValue = value.toString();
  38.             System.out.println("strValue:"+strValue);
  39.             if(strValue==null||strValue.length()==0){
  40.                 return;
  41.             }

  42.             String []array = strValue.split("\t");
  43.             System.out.println("array0:"+array[0] +" array[1]"+array[1]);
  44.             AllSortKey allSortKey = new AllSortKey( new Text(array[0]), new IntWritable(Integer.parseInt(array[1])));
  45.             context.write(allSortKey, new Text(array[1]));
  46.         }
  47.     }

  48.     public static class AllSortV2Partitioner extends Partitioner<AllSortKey, Text>{

  49.         @Override
  50.         public int getPartition(AllSortKey allSortKey, Text text, int numPartitions) {

  51.             int index = allSortKey.getFirstKey().toString().toCharArray()[0]-'A';
  52.             if (index<3){
  53.                 return 0;
  54.             }else if (index>=3){
  55.                 return 1;
  56.             }
  57.             return 0;
  58.         }
  59.     }

  60.     public static class AllSortV2Reduce extends Reducer<AllSortKey, Text, NullWritable,Text>{

  61.         @Override
  62.         public void reduce(AllSortKey allSortKey, Iterable<Text> values, Context context) throws IOException, InterruptedException {

  63.             for (Text value: values){
  64.                 context.write(NullWritable.get(),new Text( allSortKey.getFirstKey() +" "+ value));
  65.             }
  66.         }

  67.     }
  68. }




阅读(912) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~