Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1111840
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 大数据

2018-11-08 20:08:48

TopN就是找出数据中排在最前边或者最后的N个数。假设原始数据是K,V形式存储在文本文件中。
要实现这个需求如下步骤
1、首先把数据按行读入并分隔成KeyValue形式
2、需要对key相同的数据进行聚合
3、按value排序,取topN,对value排序有3种方案
   A) 对多个分区分别使用SortedMap,将value以作为SortedMap键,key作为SortedMap的value保存到SortedMap中,如果SortedMap 的size大于N,则删除首个值,这样每个分区最终得到的StoreMap就是N。最后对多个分区的SortedMap进行合并,取TopN
   B) 使用RDD的takeOrdered,对value进行排序,需要自定义排序类,此类需要继承序列化类
   C) 使用RDD的top对value进行排序,需要自定义排序类,此类需要继承序列化类

方案一:

点击(此处)折叠或打开

  1. import org.apache.spark.SparkConf;
  2. import org.apache.spark.api.java.JavaPairRDD;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.FlatMapFunction;
  6. import org.apache.spark.api.java.function.Function2;
  7. import org.apache.spark.api.java.function.PairFunction;
  8. import org.apache.spark.broadcast.Broadcast;
  9. import scala.Tuple2;

  10. import java.io.Serializable;
  11. import java.util.*;

  12. public class Top10NoUnique {
  13.     public static class compare implements Comparator<Integer>, Serializable{
  14.        public final static compare Instance= new compare();
  15.         @Override
  16.         public int compare(Integer o1, Integer o2) {
  17.             return o1-o2;
  18.         }
  19.     }
  20.     public static void main(final String args[]){

  21.         SparkConf conf = new SparkConf();
  22.         conf.setMaster("local").setAppName("Top10NoUnique");

  23.         JavaSparkContext sc = new JavaSparkContext(conf);
  24.         JavaRDD<String> lines = sc.textFile("e:\\tmp\\input\\top10nonunique.txt");

  25.         //定义共享变量
  26.         final Broadcast<Integer> topN = sc.broadcast(10);
  27.         //对数据进行分区
  28.         JavaRDD<String> parLines = lines.coalesce(2);
  29.         JavaPairRDD<String, Integer> pairRDD =parLines.mapToPair(new PairFunction<String, String, Integer>() {
  30.             @Override
  31.             public Tuple2<String, Integer> call(String s) throws Exception {
  32.                  s=s.trim();
  33.                  String []arrs = s.split(",");
  34.                 return new Tuple2<String, Integer>(arrs[0], Integer.parseInt(arrs[1]));
  35.             }
  36.         });

  37.         List<Tuple2<String,Integer>> list = pairRDD.collect();
  38.         for (Tuple2<String, Integer> tuple2 : list){
  39.             System.out.println(tuple2._1() +" "+ tuple2._2());
  40.         }

  41.         //对Key合并value值
  42.         JavaPairRDD<String, Integer> uniquePairRDD = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
  43.             @Override
  44.             public Integer call(Integer v1, Integer v2) throws Exception {
  45.                 return v1+v2;
  46.             }
  47.         });

  48.        list = uniquePairRDD.collect();
  49.         for (Tuple2<String, Integer> tuple2 : list){
  50.             System.out.println(tuple2._1() +" "+ tuple2._2());
  51.         }

  52.         //计算topN
  53.         JavaRDD<SortedMap<Integer, String>> sortedMapJavaRDD = uniquePairRDD.mapPartitions(new FlatMapFunction<Iterator<Tuple2<String, Integer>>, SortedMap<Integer, String>>() {
  54.             @Override
  55.             public Iterator<SortedMap<Integer, String>> call(Iterator<Tuple2<String, Integer>> tuple2Iterator) throws Exception {
  56.                 SortedMap<Integer, String> sortedMap = new TreeMap<Integer, String>(compare.Instance);
  57.                 while (tuple2Iterator.hasNext()){
  58.                     Tuple2<String, Integer> tuple2 = tuple2Iterator.next();
  59.                     sortedMap.put(tuple2._2(), tuple2._1());
  60.                     if(sortedMap.size() > topN.value()){
  61.                         sortedMap.remove(sortedMap.firstKey());
  62.                     }
  63.                 }
  64.                 return Collections.singletonList(sortedMap).iterator();
  65.             }
  66.         });

  67.         //对多个分区进行合并
  68.         List<SortedMap<Integer, String>> sortedMapList = sortedMapJavaRDD.collect();
  69.         SortedMap<Integer,String> top10Map = new TreeMap<>();
  70.         for (SortedMap<Integer,String> map: sortedMapList){
  71.             for (Map.Entry<Integer,String> entry: map.entrySet()){
  72.                 top10Map.put(entry.getKey(),entry.getValue());
  73.                 if(top10Map.size()>topN.value()){
  74.                     top10Map.remove(top10Map.firstKey());
  75.                 }
  76.             }
  77.         }
  78.         System.out.println("----------top 10------------------");
  79.         for (Map.Entry<Integer,String> entry: top10Map.entrySet()){
  80.             System.out.println(entry.getKey() +" "+ entry.getValue());
  81.         }
  82.     }
  83. }
方案二:

点击(此处)折叠或打开

  1. import org.apache.spark.SparkConf;
  2. import org.apache.spark.api.java.JavaPairRDD;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.Function2;
  6. import org.apache.spark.api.java.function.PairFunction;
  7. import scala.Tuple2;

  8. import java.io.Serializable;
  9. import java.util.Comparator;
  10. import java.util.List;

  11. public class Top10TaskOrder {
  12.     public static class Comp implements Comparator<Tuple2<String,Integer>> , Serializable{

  13.         public final static Comp INSTANCE = new Comp();
  14.         @Override
  15.         public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
  16.             return o2._2().compareTo(o1._2());
  17.         }
  18.     }
  19.     public static void main(String []args) {
  20.         SparkConf conf = new SparkConf();
  21.         conf.setMaster("local").setAppName("Top10TaskOrder");
  22.         JavaSparkContext sc = new JavaSparkContext(conf);
  23.         JavaRDD<String> lines =sc.textFile("e:\\tmp\\input\\top10nonunique.txt");
  24.         JavaRDD<String> partLines = lines.coalesce(2);

  25.         JavaPairRDD<String, Integer> pairRDD = partLines.mapToPair(new PairFunction<String, String, Integer>() {
  26.             @Override
  27.             public Tuple2<String, Integer> call(String s) throws Exception {
  28.                 s =s.trim();
  29.                 String []splits = s.split(",");

  30.                 return new Tuple2<String, Integer>(splits[0], Integer.parseInt(splits[1]));
  31.             }
  32.         });

  33.         JavaPairRDD<String , Integer> reducePairRDD = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
  34.             @Override
  35.             public Integer call(Integer v1, Integer v2) throws Exception {
  36.                 return v1+v2;
  37.             }
  38.         });

  39.         List<Tuple2<String, Integer> > takeOrderPairRDD = reducePairRDD.takeOrdered(10, Comp.INSTANCE);

  40.         System.out.println("--------------------top list------------------");
  41.         for (Tuple2<String, Integer> tuple2: takeOrderPairRDD){
  42.             System.out.println(tuple2._2() +" "+ tuple2._1());
  43.         }

  44.     }

阅读(11010) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~