MapReduce默认输出是在单个reduce中以key排序的,多个reduce输出之间是不排序的,
所谓全排序,就是指多个reduce之间的输出是有序的。
实现全排序有三种方法:
1、 使用一个reduce进行输出
MapReduce 默认是以key排序的,使用一个reduce,就可以实现全排序。
这种方法简单,但不能充分利用集群的计算资源,而且在数据量很大的情况下,很有可能会出现OOM问题。
2、 自定义分区函数实现全排序
MapReduce默认的分区函数是HashPartitioner,其实现的原理是计算map输出key的 hashCode ,然后对Reduce个数求模,这样只要求模结果一样的Key都会发送到同一个Reduce。我们可以根据自己的业务需求,自定义分区函数,将Key按范围划分,将某个范围内的值,划分到一个reduce中,如
·
所有 Key < 10000 的数据都发送到Reduce 0;
·
所有 10000 < Key < 20000 的数据都发送到Reduce 1;
·
其余的Key都发送到Reduce 2;
这就实现了Reduce 0的数据一定全部小于Reduce 1,且Reduce 1的数据全部小于Reduce 2,再加上同一个Reduce里面的数据局部有序,这样就实现了数据的全局有序
这种方法,虽然能够将数据分散到多个Reduce中,但是,必须手动地找到各个Reduce的分界点,尽量使得分散到每个Reduce的数据量均衡。而且每次修改Reduce的个数时,都得手动去找一次Key的分界点!非常不灵活
3、 使用TotalOrderPartitioner进行全排序
Hadoop内置有个 HashPartitioner 分区实现类,MapReduce默认就是使用它;但Hadoop内置还有个名为 TotalOrderPartitioner 的分区实现类,这个就是解决全排序的问题。也就是根据Key的分界点将不同的Key发送到相应的分区。
而这里用到的分界点是由程序解决的,而不是人工手工分区的
第一种方法,简单,就不实现了。
第二种方法实现,步骤如下:
1、自定义key,实现compareTo
2、自定义partitioner,应该清楚整个Key的索引范围
具体实现如下:
注意点:
1、自定义key是需要定义,默认构造函数,否则在运行是会报错
2、自定义
partitioner 时,key和value 类型要和map输出的key和value的类型必须一致
-
public class AllSortKey implements WritableComparable<AllSortKey> {
-
private Text firstKey = new Text();
-
private IntWritable secondKey= new IntWritable();
-
-
public IntWritable getSecondKey() {
-
return secondKey;
-
}
-
-
public void setSecondKey(IntWritable secondKey) {
-
this.secondKey = secondKey;
-
}
-
-
public Text getFirstKey() {
-
return firstKey;
-
}
-
-
public void setFirstKey(Text firstKey) {
-
this.firstKey = firstKey;
-
}
-
-
-
public AllSortKey(){
-
-
}
-
public AllSortKey(Text firstKey, IntWritable secondKey){
-
this.firstKey = firstKey;
-
this.secondKey =secondKey;
-
}
-
-
-
@Override
-
public int compareTo(AllSortKey o) {
-
int ret = this.firstKey.compareTo(o.firstKey);
-
if(ret!=0){
-
return ret;
-
}else {
-
ret = this.secondKey.compareTo(o.secondKey);
-
}
-
return ret;
-
}
-
-
@Override
-
public void write(DataOutput out) throws IOException {
-
this.firstKey.write(out);
-
this.secondKey.write(out);
-
}
-
-
@Override
-
public void readFields(DataInput in) throws IOException {
-
this.firstKey.readFields(in);
-
this.secondKey.readFields(in);
-
}
-
}
-
public class AllSortV2Job {
-
public static void main(String []args){
-
try {
-
Job job = Job.getInstance();
-
job.setJobName("AllSortV2Job");
-
job.setJarByClass(AllSortV2Job.class);
-
-
String []arrays = new GenericOptionsParser(args).getRemainingArgs();
-
FileInputFormat.setInputPaths(job,new Path(arrays[0]));
-
FileOutputFormat.setOutputPath(job,new Path(arrays[1]));
-
-
FileSystem fs = FileSystem.get(new URI( arrays[1]),job.getConfiguration());
-
if(fs.exists(new Path(arrays[1]))){
-
fs.delete(new Path(arrays[1]),true);
-
}
-
-
job.setMapperClass(AllSortV2Mapper.class);
-
job.setMapOutputKeyClass(AllSortKey.class);
-
job.setMapOutputValueClass(Text.class);
-
-
job.setReducerClass(AllSortV2Reduce.class);
-
job.setOutputKeyClass(NullWritable.class);
-
job.setOutputValueClass(Text.class);
-
-
job.setPartitionerClass(AllSortV2Partitioner.class);
-
-
job.setNumReduceTasks(2);
-
boolean ret = job.waitForCompletion(true);
-
System.out.println("ret:"+ret);
-
-
} catch (IOException e) {
-
e.printStackTrace();
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} catch (ClassNotFoundException e) {
-
e.printStackTrace();
-
} catch (URISyntaxException e) {
-
e.printStackTrace();
-
}
-
-
}
-
public static class AllSortV2Mapper extends Mapper<LongWritable ,Text , AllSortKey, Text>{
-
@Override
-
public void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
-
String strValue = value.toString();
-
System.out.println("strValue:"+strValue);
-
if(strValue==null||strValue.length()==0){
-
return;
-
}
-
-
String []array = strValue.split("\t");
-
System.out.println("array0:"+array[0] +" array[1]"+array[1]);
-
AllSortKey allSortKey = new AllSortKey( new Text(array[0]), new IntWritable(Integer.parseInt(array[1])));
-
context.write(allSortKey, new Text(array[1]));
-
}
-
}
-
-
public static class AllSortV2Partitioner extends Partitioner<AllSortKey, Text>{
-
-
@Override
-
public int getPartition(AllSortKey allSortKey, Text text, int numPartitions) {
-
-
int index = allSortKey.getFirstKey().toString().toCharArray()[0]-'A';
-
if (index<3){
-
return 0;
-
}else if (index>=3){
-
return 1;
-
}
-
return 0;
-
}
-
}
-
-
public static class AllSortV2Reduce extends Reducer<AllSortKey, Text, NullWritable,Text>{
-
-
@Override
-
public void reduce(AllSortKey allSortKey, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-
-
for (Text value: values){
-
context.write(NullWritable.get(),new Text( allSortKey.getFirstKey() +" "+ value));
-
}
-
}
-
-
}
-
}
阅读(912) | 评论(0) | 转发(0) |