TopN算法是一个经典的算法,由于每个map都只是实现了本地的TopN算法,而假设map有M个,在归约的阶段只有M x N个,这个结果是可以接受的并不会造成性能瓶颈。
这个TopN算法在map阶段将使用TreeMap来实现排序,以到达可伸缩的目的。
实现步骤:
1、在mapper中定义TreeMap 将输入数据按put到TreeMap中,如果TreeMap的size大于N,需要移除最小的数据
2、在以往的mapper中,我们都是处理一条数据之后就context.write或者output.collector一次。而在这里是把所有数据处理完之后再进行写入。所以,我们可以把这个context.write放在cleanup里执行。
cleanup就是整个mapper task执行完之后会执行的一个函数。
3、在reduce中定义TreeMap 将输入数据按put到TreeMap中,对Mapper输出的数据在进行一次汇总,选出其中的Top K
具体实现如下:
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.LongWritable;
-
import org.apache.hadoop.io.NullWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.apache.hadoop.util.GenericOptionsParser;
-
-
import java.io.IOException;
-
import java.util.Comparator;
-
import java.util.TreeMap;
-
-
public class TopTenJob {
-
public static class TopTenMapper extends Mapper<LongWritable ,Text, NullWritable, Text>{
-
private static TreeMap<Long, String> map = new TreeMap<Long,String >();
-
@Override
-
public void map(LongWritable key, Text value, Context context){
-
-
String strValue = value.toString();
-
String []arrays = strValue.split("\t");
-
-
map.put(Long.parseLong(arrays[1]), strValue);
-
if(map.size()>10){
-
map.remove(map.firstKey());
-
}
-
}
-
-
@Override
-
protected void cleanup(Context context
-
) throws IOException, InterruptedException {
-
for (String str: map.values())
-
context.write(NullWritable.get(), new Text(str));
-
}
-
}
-
-
-
public static class TopTenReduce extends Reducer<NullWritable, Text, NullWritable, Text>{
-
private static TreeMap<Long, String> topTenMap = new TreeMap<>(new Comparator() {
-
@Override
-
public int compare(Object o1, Object o2) {
-
Long lo1 = (Long)o1;
-
Long lo2 = (Long)o2;
-
return lo2.compareTo(lo1);
-
}
-
});
-
@Override
-
public void reduce (NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-
for(Text text:values){
-
String value = text.toString();
-
String [] arrays = value.split("\t");
-
-
topTenMap.put(Long.parseLong(arrays[1]),value);
-
if(topTenMap.size()>10){
-
topTenMap.remove(topTenMap.firstKey() );
-
}
-
}
-
-
for (String strVal : topTenMap.values()){
-
context.write(NullWritable.get(), new Text(strVal));
-
}
-
}
-
-
}
-
-
public static void main(String []args){
-
-
Job job = null;
-
try {
-
job = Job.getInstance();
-
job.setJobName("TopTenJob");
-
job.setJarByClass(TopTenJob.class);
-
-
job.setMapperClass(TopTenMapper.class);
-
job.setMapOutputKeyClass(NullWritable.class);
-
job.setOutputValueClass(Text.class);
-
-
job.setReducerClass(TopTenReduce.class);
-
job.setOutputKeyClass(NullWritable.class);
-
job.setOutputValueClass(Text.class);
-
-
String []arrays = new GenericOptionsParser(args).getRemainingArgs();
-
FileInputFormat.addInputPath(job, new Path(arrays[0]));
-
FileOutputFormat.setOutputPath(job, new Path(arrays[1]));
-
-
job.setNumReduceTasks(1);
-
-
System.out.println(job.waitForCompletion(true));
-
} catch (IOException e) {
-
e.printStackTrace();
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} catch (ClassNotFoundException e) {
-
e.printStackTrace();
-
}
-
-
}
-
}
阅读(1260) | 评论(0) | 转发(0) |