TopN就是找出数据中排在最前边或者最后的N个数。假设原始数据是K,V形式存储在文本文件中。
要实现这个需求如下步骤
1、首先把数据按行读入并分隔成KeyValue形式
2、需要对key相同的数据进行聚合
3、按value排序,取topN,对value排序有3种方案
A) 对多个分区分别使用SortedMap,将value以作为SortedMap键,key作为SortedMap的value保存到SortedMap中,如果SortedMap 的size大于N,则删除首个值,这样每个分区最终得到的StoreMap就是N。最后对多个分区的SortedMap进行合并,取TopN
B) 使用RDD的takeOrdered,对value进行排序,需要自定义排序类,此类需要继承序列化类
C) 使用RDD的top对value进行排序,需要自定义排序类,此类需要继承序列化类
方案一:
-
import org.apache.spark.SparkConf;
-
import org.apache.spark.api.java.JavaPairRDD;
-
import org.apache.spark.api.java.JavaRDD;
-
import org.apache.spark.api.java.JavaSparkContext;
-
import org.apache.spark.api.java.function.FlatMapFunction;
-
import org.apache.spark.api.java.function.Function2;
-
import org.apache.spark.api.java.function.PairFunction;
-
import org.apache.spark.broadcast.Broadcast;
-
import scala.Tuple2;
-
-
import java.io.Serializable;
-
import java.util.*;
-
-
public class Top10NoUnique {
-
public static class compare implements Comparator<Integer>, Serializable{
-
public final static compare Instance= new compare();
-
@Override
-
public int compare(Integer o1, Integer o2) {
-
return o1-o2;
-
}
-
}
-
public static void main(final String args[]){
-
-
SparkConf conf = new SparkConf();
-
conf.setMaster("local").setAppName("Top10NoUnique");
-
-
JavaSparkContext sc = new JavaSparkContext(conf);
-
JavaRDD<String> lines = sc.textFile("e:\\tmp\\input\\top10nonunique.txt");
-
-
//定义共享变量
-
final Broadcast<Integer> topN = sc.broadcast(10);
-
//对数据进行分区
-
JavaRDD<String> parLines = lines.coalesce(2);
-
JavaPairRDD<String, Integer> pairRDD =parLines.mapToPair(new PairFunction<String, String, Integer>() {
-
@Override
-
public Tuple2<String, Integer> call(String s) throws Exception {
-
s=s.trim();
-
String []arrs = s.split(",");
-
return new Tuple2<String, Integer>(arrs[0], Integer.parseInt(arrs[1]));
-
}
-
});
-
-
List<Tuple2<String,Integer>> list = pairRDD.collect();
-
for (Tuple2<String, Integer> tuple2 : list){
-
System.out.println(tuple2._1() +" "+ tuple2._2());
-
}
-
-
//对Key合并value值
-
JavaPairRDD<String, Integer> uniquePairRDD = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
-
@Override
-
public Integer call(Integer v1, Integer v2) throws Exception {
-
return v1+v2;
-
}
-
});
-
-
list = uniquePairRDD.collect();
-
for (Tuple2<String, Integer> tuple2 : list){
-
System.out.println(tuple2._1() +" "+ tuple2._2());
-
}
-
-
//计算topN
-
JavaRDD<SortedMap<Integer, String>> sortedMapJavaRDD = uniquePairRDD.mapPartitions(new FlatMapFunction<Iterator<Tuple2<String, Integer>>, SortedMap<Integer, String>>() {
-
@Override
-
public Iterator<SortedMap<Integer, String>> call(Iterator<Tuple2<String, Integer>> tuple2Iterator) throws Exception {
-
SortedMap<Integer, String> sortedMap = new TreeMap<Integer, String>(compare.Instance);
-
while (tuple2Iterator.hasNext()){
-
Tuple2<String, Integer> tuple2 = tuple2Iterator.next();
-
sortedMap.put(tuple2._2(), tuple2._1());
-
if(sortedMap.size() > topN.value()){
-
sortedMap.remove(sortedMap.firstKey());
-
}
-
}
-
return Collections.singletonList(sortedMap).iterator();
-
}
-
});
-
-
//对多个分区进行合并
-
List<SortedMap<Integer, String>> sortedMapList = sortedMapJavaRDD.collect();
-
SortedMap<Integer,String> top10Map = new TreeMap<>();
-
for (SortedMap<Integer,String> map: sortedMapList){
-
for (Map.Entry<Integer,String> entry: map.entrySet()){
-
top10Map.put(entry.getKey(),entry.getValue());
-
if(top10Map.size()>topN.value()){
-
top10Map.remove(top10Map.firstKey());
-
}
-
}
-
}
-
System.out.println("----------top 10------------------");
-
for (Map.Entry<Integer,String> entry: top10Map.entrySet()){
-
System.out.println(entry.getKey() +" "+ entry.getValue());
-
}
-
}
-
}
方案二:
-
import org.apache.spark.SparkConf;
-
import org.apache.spark.api.java.JavaPairRDD;
-
import org.apache.spark.api.java.JavaRDD;
-
import org.apache.spark.api.java.JavaSparkContext;
-
import org.apache.spark.api.java.function.Function2;
-
import org.apache.spark.api.java.function.PairFunction;
-
import scala.Tuple2;
-
-
import java.io.Serializable;
-
import java.util.Comparator;
-
import java.util.List;
-
-
public class Top10TaskOrder {
-
public static class Comp implements Comparator<Tuple2<String,Integer>> , Serializable{
-
-
public final static Comp INSTANCE = new Comp();
-
@Override
-
public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
-
return o2._2().compareTo(o1._2());
-
}
-
}
-
public static void main(String []args) {
-
SparkConf conf = new SparkConf();
-
conf.setMaster("local").setAppName("Top10TaskOrder");
-
JavaSparkContext sc = new JavaSparkContext(conf);
-
JavaRDD<String> lines =sc.textFile("e:\\tmp\\input\\top10nonunique.txt");
-
JavaRDD<String> partLines = lines.coalesce(2);
-
-
JavaPairRDD<String, Integer> pairRDD = partLines.mapToPair(new PairFunction<String, String, Integer>() {
-
@Override
-
public Tuple2<String, Integer> call(String s) throws Exception {
-
s =s.trim();
-
String []splits = s.split(",");
-
-
return new Tuple2<String, Integer>(splits[0], Integer.parseInt(splits[1]));
-
}
-
});
-
-
JavaPairRDD<String , Integer> reducePairRDD = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
-
@Override
-
public Integer call(Integer v1, Integer v2) throws Exception {
-
return v1+v2;
-
}
-
});
-
-
List<Tuple2<String, Integer> > takeOrderPairRDD = reducePairRDD.takeOrdered(10, Comp.INSTANCE);
-
-
System.out.println("--------------------top list------------------");
-
for (Tuple2<String, Integer> tuple2: takeOrderPairRDD){
-
System.out.println(tuple2._2() +" "+ tuple2._1());
-
}
-
-
}
阅读(11021) | 评论(0) | 转发(0) |