总结:
1、当reducenum 设置为1时,自定义的分区函数不会被调用,这可以理解
2、在实现compare 函数
时要要注意,两个对象firstKey值不相等是要直接返回,只有两个相等时,才比较secondKey,自己在实现是没有注意到这个问题随便写了,调试半天没有发现问题。
错误写法如下:
-
public int compare(WritableComparable a, WritableComparable b){
-
-
SecondSortKey keyA = (SecondSortKey)a;
-
SecondSortKey keyB = (SecondSortKey)b;
-
//System.out.println("SecondSortComparator , keyA:<"+keyA.getFirstKey() +","+ keyA.getSecondKey() +"> keyB:<"+keyB.getFirstKey() +","+ keyB.getSecondKey()+">");
-
int result = keyA.getFirstKey().compareTo(keyB.getFirstKey());
-
if(result!=0){
-
result = keyA.getSecondKey().compareTo(keyB.getSecondKey());
-
}
-
return result;
-
}
3、写SortComparator和groupComparator函数时,要注意构造函数实现,否则会报空指针异常
protected SecondSortComparator(){
-
super(SecondSortKey.class,true);
-
}
-
1、自定义组合键
-
}
-
-
@Override
-
public boolean equals(Object o){
-
if(!(o instanceof SecondSortKey)){
-
return false;
-
}
-
-
SecondSortKey sortKey =(SecondSortKey) o;
-
return this.firstKey.equals(sortKey.getFirstKey());
-
}
-
-
@Override
-
public int hashCode(){
-
return this.firstKey.hashCode();
-
}
-
@Override
-
public void write(DataOutput out) throws IOException {
-
this.firstKey.write(out);
-
this.secondKey.write(out);
-
}
-
-
@Override
-
public void readFields(DataInput in) throws IOException {
-
this.firstKey.readFields(in);
-
this.secondKey.readFields(in);
-
}
2、自定义分区
-
public static class SecondSortPartitioner extends Partitioner<SecondSortKey, IntWritable> {
-
@Override
-
public int getPartition(SecondSortKey key, IntWritable value, int numPartitions) {
-
return (key.getFirstKey().hashCode())%numPartitions;
-
}
-
}
3、自定义排序
-
public static class SecondSortComparator extends WritableComparator {
-
-
protected SecondSortComparator(){
-
super(SecondSortKey.class,true);
-
}
-
-
@Override
-
public int compare(WritableComparable a, WritableComparable b){
-
-
SecondSortKey keyA = (SecondSortKey)a;
-
SecondSortKey keyB = (SecondSortKey)b;
-
//System.out.println("SecondSortComparator , keyA:<"+keyA.getFirstKey() +","+ keyA.getSecondKey() +"> keyB:<"+keyB.getFirstKey() +","+ keyB.getSecondKey()+">");
-
int result = keyA.getFirstKey().compareTo(keyB.getFirstKey());
-
if(result!=0){
-
return result;
-
}else {
-
result = keyA.getSecondKey().compareTo(keyB.getSecondKey());
-
}
-
return result;
-
}
-
}
4、自定义分组排序
-
public static class SecondSortGroupComparator extends WritableComparator {
-
protected SecondSortGroupComparator(){
-
super(SecondSortKey.class,true);
-
}
-
@Override
-
public int compare(WritableComparable a, WritableComparable b){
-
SecondSortKey keyA = (SecondSortKey) a;
-
SecondSortKey keyB = (SecondSortKey) b;
-
System.out.println("SecondSortGroupComparator , keyA:<"+keyA.getFirstKey() +","+ keyA.getSecondKey() +"> keyB:<"+keyB.getFirstKey() +","+ keyB.getSecondKey()+">");
-
int result = keyA.getFirstKey().compareTo(keyB.getFirstKey());
-
if(result!=0){
-
return result;
-
}else {
-
result = keyA.getSecondKey().compareTo(keyB.getSecondKey());
-
}
-
-
return result;
-
}
-
}
5、实现map
-
public static class SecondSortMapper extends Mapper<LongWritable, Text, SecondSortKey, IntWritable>{
-
@Override
-
protected void map(LongWritable key, Text value,
-
Context context) throws IOException, InterruptedException {
-
String strValue = value.toString();
-
if(strValue==null||strValue.length()==0){
-
return;
-
}
-
-
String []array = strValue.split("\t");
-
// System.out.println("map recv data:"+array[0]+" -- "+ array[1]);
-
SecondSortKey secondSortKey = new SecondSortKey(new Text(array[0].trim()),new IntWritable(Integer.parseInt(array[1].trim())));
-
context.write(secondSortKey, new IntWritable(Integer.parseInt(array[1].trim())));
-
}
-
}
6、reduce实现
-
public static class SecondSortReducer extends Reducer<SecondSortKey, IntWritable, NullWritable, Text>{
-
@Override
-
protected void reduce(SecondSortKey key,Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
-
for (IntWritable value: values){
-
String data = key.getFirstKey()+" "+ String.valueOf(value.get());
-
context.write(NullWritable.get(), new Text(data));
-
}
-
}
-
}
7、主程序
-
public static void main(String []args){
-
try {
-
Job job = Job.getInstance();
-
job.setJobName("SecondSort");
-
job.setJarByClass(SecondSortJob.class);
-
String []arrays = new GenericOptionsParser(args).getRemainingArgs();
-
FileInputFormat.setInputPaths(job,new Path(arrays[0]));
-
-
FileSystem fs = FileSystem.get(new URI(arrays[1]),job.getConfiguration());
-
if(fs.exists(new Path(arrays[1]))){
-
//System.out.println("del:"+ arrays[1]);
-
fs.delete(new Path(arrays[1]),true);
-
}
-
FileOutputFormat.setOutputPath(job,new Path(arrays[1]));
-
-
job.setMapperClass(SecondSortMapper.class);
-
job.setMapOutputKeyClass(SecondSortKey.class);
-
job.setMapOutputValueClass(IntWritable.class);
-
-
job.setReducerClass(SecondSortReducer.class);
-
job.setOutputKeyClass(NullWritable.class);
-
job.setOutputValueClass(Text.class);
-
-
job.setPartitionerClass(SecondSortPartitioner.class);
-
job.setSortComparatorClass(SecondSortComparator.class);
-
job.setGroupingComparatorClass(SecondSortGroupComparator.class);
-
-
// job.setInputFormatClass(FileInputFormat.class);
-
// job.setOutputFormatClass(FileOutputFormat.class);
-
-
job.setNumReduceTasks(1);
-
boolean ret= job.waitForCompletion(true);
-
if(ret){
-
System.out.println("succ");
-
}else {
-
System.out.println("fail");
-
}
-
-
} catch (IOException e) {
-
e.printStackTrace();
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} catch (ClassNotFoundException e) {
-
e.printStackTrace();
-
} catch (URISyntaxException e) {
-
e.printStackTrace();
-
}
-
-
}
输入:
A 5
|
B
3
|
B
8
|
B
5
|
A
2
|
C
2
|
C
8
|
C
5
|
输出:
A 2
|
A 5
|
B 3
|
B 5
|
B 8
|
C 2
|
C 5
|
C 8
|
阅读(783) | 评论(0) | 转发(0) |