分区模式将记录进行分类,但它并不关心记录的顺序。其主要目的是将数据集中相似的记录分成不同的、更小的数据集
分区主要原理是,自定义分区类继承Partitioner,根据业务需求实现分区函数 public int getPartition(Text key, Text value, int numPartitions),将Key相同的记录,划分到同一reduce函数中。需要注意的是如果在驱动程序中将NumReduceTasks值设置为时, 不会执行分区函数。这个可以理解,毕竟只有1个reduce,所以,没有必要执行
Partitioner
具体实现如下:
-
import org.apache.hadoop.fs.FileSystem;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.LongWritable;
-
import org.apache.hadoop.io.NullWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Partitioner;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-
import java.io.IOException;
-
-
public class PartitionJob {
-
-
public static class PartitionMapper extends Mapper<LongWritable ,Text, Text,Text>{
-
private Text tmpKey = new Text();
-
@Override
-
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
-
String data = value.toString();
-
String []arrays = data.split("\t");
-
tmpKey.set(arrays[0]);
-
System.out.println("data:"+data);
-
context.write(tmpKey, value);
-
}
-
}
-
-
public static class PartitionReduce extends Reducer<Text, Text, NullWritable, Text>{
-
private Text out = new Text();
-
@Override
-
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-
String str="";
-
for (Text val:values){
-
context.write(NullWritable.get(), val);
-
}
-
}
-
-
}
-
-
public static class Partition extends Partitioner<Text ,Text>{
-
@Override
-
public int getPartition(Text key, Text value, int numPartitions) {
-
return Math.abs(key.hashCode())%numPartitions;
-
}
-
-
}
-
-
public static void main(String []args){
-
-
try {
-
Job job = Job.getInstance();
-
job.setJobName("PartitionJob");
-
job.setJarByClass(PartitionJob.class);
-
job.setMapperClass(PartitionMapper.class);
-
job.setMapOutputKeyClass(Text.class);
-
job.setMapOutputValueClass(Text.class);
-
-
job.setReducerClass(PartitionReduce.class);
-
job.setOutputKeyClass(NullWritable.class);
-
job.setOutputValueClass(Text.class);
-
job.setPartitionerClass(Partition.class);
-
job.setNumReduceTasks(2);
-
-
FileInputFormat.addInputPath(job, new Path(args[0]));
-
FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
FileSystem.get(job.getConfiguration()).delete(new Path(args[1]), true);
-
-
System.out.println(job.waitForCompletion(true));
-
} catch (IOException e) {
-
e.printStackTrace();
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} catch (ClassNotFoundException e) {
-
e.printStackTrace();
-
}
-
-
-
}
-
}
阅读(3078) | 评论(0) | 转发(0) |