分类: 云计算
2018-06-14 20:26:56
假设有如下原始数据DATA1,我们需要通过Map Reducer得到DATA3.
DATA1 |
DATA2 |
DATA3 |
||
A 5 |
A 5 |
A 2 |
||
B 3 |
A 2 |
A 5 |
||
B 8 |
B 3 |
B 3 |
||
B 5 |
B 8 |
B 5 |
||
A 2 |
B 5 |
B 8 |
||
C 2 |
C 2 |
C 2 |
||
C 8 |
C 8 |
C 5 |
||
C 5 |
C 5 |
C 8 |
默认情况下,Map Reducer输出的结果只会对Key进行默认的排序,而不会对value排序,默认情况下会得到DATA2,需要得到DATA3有两种方案:
1、 在reduce中对Value排序
2、 二次排序,所谓二次排序对key排序的同时还需要对Value进行排序
实现比较简单,代码如下:
点击(此处)折叠或打开
注意点:
1、 在reduce中对values进行迭代的时候,不要直接存储key或values,因为reduce可能会执行多次,但key和value相关对象只有两个,直接存储key或values,会造成值混乱。需要用相应的数据类型.get()取出后再存储。
2、 当values数据量较大时,可能会造成out of memory
3、 需要等待所有的value值接收完成,才能进行sort
MapReduce默认是以Key排序的,我们是否通过 重新定义key来实现二次排序?
先学习下Map Reduce处理流程
通过以上学习,我们可以通过自定义key,这样map端数据的格式就是<(key,value),value>,
这样只需在分区的时候把,把key相同的值划分到同一个reduce中。在reduce中,我们只需要对组合key排序和分组即可实现。
通过以上分析,进行二次排序需要如下步骤
1、 自定义组合key,
组合key由key和要排序的value组成,要实现WritableComparable接口,并且实现compareTo()方法的比较策略
2、 自定义分区函数
将相同Key分配到同一个reduce中,要继承Partitioner,重写getPartition函数
自定义分区函数类FirstPartitioner,是key的第一次比较,完成对所有key的排序。
public static class FirstPartitioner extends Partitioner< IntPair,IntWritable>
在job中使用setPartitionerClasss()方法设置Partitioner
job.setPartitionerClasss(FirstPartitioner.Class);
3、 自定义分组比较器
定义这个比较器,可以有两种方式。
1) 继承 WritableComparator。
public static class GroupingComparator extends WritableComparator
必须有一个构造函数,并且重载以下方法。
public int compare(WritableComparable w1, WritableComparable w2)
2) 实现接口 RawComparator。
上面两种实现方式,在 Job 中,可以通过 setGroupingComparatorClass()方法来设置分组类。
job.setGroupingComparatorClass(GroupingComparator.Class);
4、 自定义排序比较器
这是Key的第二次比较,对所有的Key进行排序,即同时完成IntPair中的first和second排序。该类是一个比较器,可以通过两种方式实现。
1) 继承WritableComparator。
public static class KeyComparator extends WritableComparator
必须有一个构造函数,并且重载以下方法。
public int compare(WritableComparable w1, WritableComparable w2)
2) 实现接口 RawComparator。
上面两种实现方式,在Job中,可以通过setSortComparatorClass()方法来设置Key的比较类。
job.setSortComparatorClass(KeyComparator.Class);
注意:如果没有使用自定义的SortComparator类,则默认使用Key中compareTo()方法对Key排序。