Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1106378
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 大数据

2018-11-06 17:37:11


点击(此处)折叠或打开

  1. import org.apache.spark.SparkConf;
  2. import org.apache.spark.api.java.JavaPairRDD;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.Function;
  6. import org.apache.spark.api.java.function.PairFunction;
  7. import scala.Tuple2;

  8. import java.io.Serializable;
  9. import java.util.ArrayList;
  10. import java.util.Collections;
  11. import java.util.Comparator;
  12. import java.util.List;

  13. public class SecondarySort {
  14.     static List<Tuple2<Integer,Integer>> iterableToList(Iterable<Tuple2<Integer,Integer>> iterable) {
  15.         List<Tuple2<Integer,Integer>> list = new ArrayList<Tuple2<Integer,Integer>>();
  16.         for (Tuple2<Integer,Integer> item : iterable) {
  17.             list.add(item);
  18.         }
  19.         return list;
  20.     }

  21.     public static class SparkTupleComparator
  22.             implements Comparator<Tuple2<Integer, Integer>>, Serializable {

  23.         public static final SparkTupleComparator INSTANCE = new SparkTupleComparator();

  24.         private SparkTupleComparator() {
  25.         }

  26.         @Override
  27.         public int compare(Tuple2<Integer, Integer> t1, Tuple2<Integer, Integer> t2){
  28.             return t1._1.compareTo(t2._1);
  29.         }
  30.     }
  31.     public static void main(String []args){

  32.         SparkConf sparkConf = new SparkConf();
  33.         sparkConf.setMaster("local").setAppName("SecondarySort");

  34.         JavaSparkContext sc = new JavaSparkContext(sparkConf);
  35.         JavaRDD<String> lines = sc.textFile("E:\\tmp\\input\\secondarysort.txt");

  36.         //生成键值对
  37.         JavaPairRDD<String, Tuple2<Integer, Integer>> pairRDD = lines.mapToPair(new PairFunction<String, String, Tuple2<Integer, Integer>>() {
  38.             @Override
  39.             public Tuple2<String, Tuple2<Integer, Integer>> call(String s) throws Exception {
  40.                 String[] values = s.split(",");
  41.                 Tuple2<Integer, Integer> tupleValue = new Tuple2<Integer, Integer>(Integer.parseInt(values[1]), Integer.parseInt(values[2]));
  42.                 Tuple2<String, Tuple2<Integer, Integer>> tuple = new Tuple2<String, Tuple2<Integer, Integer>>(values[0], tupleValue);
  43.                 return tuple;
  44.             }
  45.         });

  46.         List<Tuple2<String, Tuple2<Integer, Integer>>> tuple2List = pairRDD.collect();
  47.         System.out.println("k-v list:");
  48.         for (Tuple2<String, Tuple2<Integer, Integer>> tuple:tuple2List){
  49.             System.out.println(tuple._1() +" "+ tuple._2()._1() +" "+ tuple._2()._2() );
  50.         }

  51.         //根据key进行group,并按key进行排序
  52.         JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>> groupBySortByKeyRDD=pairRDD.groupByKey().sortByKey();
  53.         List<Tuple2<String, Iterable<Tuple2<Integer, Integer>>>> groupList = groupBySortByKeyRDD.collect();
  54.         System.out.println("---------------sort by key result---------------");
  55.         for (Tuple2<String,Iterable<Tuple2<Integer, Integer>>> tuple:groupList){
  56.             for (Tuple2<Integer, Integer> value:tuple._2()){
  57.                 System.out.println("key:" +tuple._1() +" value: " +value._1() +" "+ value._2());
  58.             }
  59.         }

  60.         //对value进行排序
  61.         JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>> sortRDD = groupBySortByKeyRDD.mapValues(
  62.                 new Function<Iterable<Tuple2<Integer, Integer>>, Iterable<Tuple2<Integer, Integer>>>() {
  63.             @Override
  64.             public Iterable<Tuple2<Integer, Integer>> call(Iterable<Tuple2<Integer, Integer>> v1) throws Exception {
  65.                 List<Tuple2<Integer, Integer>> newList = new ArrayList<Tuple2<Integer, Integer>>(SecondarySort.iterableToList(v1));
  66.                 Collections.sort(newList, SparkTupleComparator.INSTANCE);
  67.                 return newList;
  68.             }
  69.         });


  70.         List<Tuple2<String, Iterable<Tuple2<Integer, Integer>>>> sortList = sortRDD.collect();
  71.         System.out.println("---------------result---------------");
  72.         for (Tuple2<String,Iterable<Tuple2<Integer, Integer>>> tuple:sortList){
  73.             for (Tuple2<Integer, Integer> value:tuple._2()){
  74.                 System.out.println( " key:" +tuple._1() +" value: "+value._1() +" "+ value._2());
  75.             }
  76.         }

  77.     }

阅读(3257) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~