Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1106552
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 云计算

2018-06-14 20:26:56

假设有如下原始数据DATA1,我们需要通过Map Reducer得到DATA3.

DATA1

DATA2

DATA3

A   5 

A   5 

A   2 

B   3 

A   2 

A   5 

B   8 

B   3 

B   3 

B   5 

B   8 

B   5 

A   2 

B   5 

B   8

C   2

C   2

C   2

C   8 

C   8 

C   5 

C   5

C   5

C   8


 默认情况下,Map Reducer输出的结果只会对Key进行默认的排序,而不会对value排序,默认情况下会得到DATA2,需要得到DATA3有两种方案:

1、  在reduce中对Value排序

2、  二次排序,所谓二次排序对key排序的同时还需要对Value进行排序


在reduce中对value排序

 实现比较简单,代码如下:


点击(此处)折叠或打开

  1. @Override

  2. public void reduce(Text key, Iterable values, Context context)

  3.                    throws IOException, InterruptedException {

  4.           List valuesList = new ArrayList();

  5.           // 获取value

  6.           for(IntWritable value : values) {

  7.                valuesList.add(value.get());

  8.           }

  9.           // 排序

  10.           Collections.sort(valuesList);

  11.           for(Integer value : valuesList) {

  12.                 context.write(key, new IntWritable(value));

  13.           }
  14. }


注意点:


1、  在reduce中对values进行迭代的时候,不要直接存储key或values,因为reduce可能会执行多次,但key和value相关对象只有两个,直接存储key或values,会造成值混乱。需要用相应的数据类型.get()取出后再存储。

2、  当values数据量较大时,可能会造成out of memory

3、  需要等待所有的value值接收完成,才能进行sort


二次排序

MapReduce默认是以Key排序的,我们是否通过 重新定义key来实现二次排序?

先学习下Map Reduce处理流程


通过以上学习,我们可以通过自定义key,这样map端数据的格式就是<(key,value),value>,

这样只需在分区的时候把,把key相同的值划分到同一个reduce中。在reduce中,我们只需要对组合key排序和分组即可实现。

通过以上分析,进行二次排序需要如下步骤

1、 自定义组合key,

 组合key由key和要排序的value组成,要实现WritableComparable接口,并且实现compareTo()方法的比较策略

2、 自定义分区函数

将相同Key分配到同一个reduce中,要继承Partitioner,重写getPartition函数
      自定义分区函数类FirstPartitioner,是key的第一次比较,完成对所有key的排序。

 public static class FirstPartitioner extends Partitioner< IntPair,IntWritable>

 在job中使用setPartitionerClasss()方法设置Partitioner

 job.setPartitionerClasss(FirstPartitioner.Class);

3、 自定义分组比较器

定义这个比较器,可以有两种方式。

1) 继承 WritableComparator。

   public static class GroupingComparator extends WritableComparator

   必须有一个构造函数,并且重载以下方法。

   public int compare(WritableComparable w1, WritableComparable w2)

2) 实现接口 RawComparator。

上面两种实现方式,在 Job 中,可以通过 setGroupingComparatorClass()方法来设置分组类。

 job.setGroupingComparatorClass(GroupingComparator.Class);

4、 自定义排序比较器

 这是Key的第二次比较,对所有的Key进行排序,即同时完成IntPair中的first和second排序。该类是一个比较器,可以通过两种方式实现。

 1) 继承WritableComparator。

     public static class KeyComparator extends WritableComparator

     必须有一个构造函数,并且重载以下方法。

      public int compare(WritableComparable w1, WritableComparable w2)

  2) 实现接口 RawComparator。

    上面两种实现方式,在Job中,可以通过setSortComparatorClass()方法来设置Key的比较类。

job.setSortComparatorClass(KeyComparator.Class);
    注意:如果没有使用自定义的SortComparator类,则默认使用Key中compareTo()方法对Key排序。

阅读(4125) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~