Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1106463
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 大数据

2018-11-11 15:46:31

Spark连接有2种方案
方案一:
  使用spark自带的union函数,union函数要求连接的两个RDD类型必须一致。具体思路与MapReduce实现jion一致,即对两个数据源分别打标签A,B,然后在reduce中,把key相同的值连接起来。最后在按业务要求对数据进行处理

方案二:
  使用spark提供的leftOuterJoin函数,对两个RDD进行做外链接,然后对数据进行处理。

相比而言,方案二更简洁些,建议采用方案二

方案一实现:

点击(此处)折叠或打开

  1. import org.apache.spark.SparkConf;
  2. import org.apache.spark.api.java.JavaPairRDD;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.Function;
  6. import org.apache.spark.api.java.function.PairFlatMapFunction;
  7. import org.apache.spark.api.java.function.PairFunction;
  8. import scala.Tuple2;

  9. import java.util.*;

  10. public class LeftJoin {
  11.     public static void main(String []args){
  12.         SparkConf conf = new SparkConf();
  13.         conf.setMaster("local").setAppName("LeftJoin");
  14.         JavaSparkContext sc = new JavaSparkContext(conf);
  15.         JavaRDD<String> userLines =sc.textFile("E:\\tmp\\input\\user.txt");

  16.         JavaPairRDD<String, Tuple2<String, String>> userPairRDD = userLines.mapToPair(new PairFunction<String, String, Tuple2<String, String>>() {
  17.             @Override
  18.             public Tuple2<String, Tuple2<String, String>> call(String s) throws Exception {
  19.                 String []split = s.split("\t");
  20.                 Tuple2<String,String> tuple2 = new Tuple2<String, String>("L",split[1]);
  21.                 return new Tuple2<String, Tuple2<String, String>>(split[0], tuple2);
  22.             }
  23.         });

  24.         List<Tuple2<String, Tuple2<String, String>>> userList = userPairRDD.collect();
  25.         System.out.println("---------------user list-----------------------");
  26.         for (Tuple2<String, Tuple2<String, String>> tuple2:userList){
  27.             System.out.println(tuple2._1() +" "+ tuple2._2()._1() + " "+ tuple2._2()._2());

  28.         }

  29.         JavaRDD<String> transRDD = sc.textFile("E:\\tmp\\input\\trans.txt");

  30.         JavaPairRDD<String,Tuple2<String, String>> transPairRDD = transRDD.mapToPair(new PairFunction<String, String, Tuple2<String, String>>() {
  31.             @Override
  32.             public Tuple2<String, Tuple2<String, String>> call(String s) throws Exception {
  33.                 String []split = s.split("\t");
  34.                 Tuple2<String, String> tuple2 = new Tuple2<String, String>("P",split[1]);
  35.                 return new Tuple2<String, Tuple2<String, String>>(split[2], tuple2);
  36.             }
  37.         });

  38.         List<Tuple2<String, Tuple2<String, String>>> transList = transPairRDD.collect();
  39.         System.out.println("---------------trans list-----------------------");
  40.         for (Tuple2<String, Tuple2<String, String>> tuple2:transList){
  41.             System.out.println(tuple2._1() +" "+ tuple2._2()._1() + " "+ tuple2._2()._2());

  42.         }

  43.         JavaPairRDD<String,Tuple2<String,String>> allPairRDD = transPairRDD.union(userPairRDD);

  44.         List<Tuple2<String, Tuple2<String, String>>> allPairList = allPairRDD.collect();
  45.         System.out.println("---------------all list-----------------------");
  46.         for (Tuple2<String, Tuple2<String, String>> tuple2:allPairList){
  47.             System.out.println(tuple2._1() +" "+ tuple2._2()._1() + " "+ tuple2._2()._2());
  48.         }

  49.         JavaPairRDD<String,Iterable<Tuple2<String,String>>> groupPairRDD = allPairRDD.groupByKey();
  50.         List<Tuple2<String, Iterable<Tuple2<String, String>>>> groupList= groupPairRDD.collect();
  51.         System.out.println("---------------group list-----------------------");
  52.         for (Tuple2<String, Iterable<Tuple2<String, String>>> tuple2: groupList){
  53.             String key = tuple2._1();
  54.             System.out.println(key +" "+ tuple2._2());
  55. // System.out.println("-------"+key+"-------");
  56. // Iterable> iterable =tuple2._2();
  57. // for (Tuple2 tuple21:iterable){
  58. // System.out.println(tuple21._1()+" "+ tuple21._2());
  59. // }
  60.         }

  61.         JavaPairRDD<String,String> userTransPairRDD = groupPairRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Iterable<Tuple2<String, String>>>, String, String>() {
  62.             @Override
  63.             public Iterator<Tuple2<String, String>> call(Tuple2<String, Iterable<Tuple2<String, String>>> iterableTuple2) throws Exception {
  64.                 String key = iterableTuple2._1();
  65.                 Iterable<Tuple2<String, String>> values = iterableTuple2._2();
  66.                 String user="UNKNOWN";
  67.                 List<String> trans = new ArrayList<String>();
  68.                 for (Tuple2<String, String> tuple2: values){
  69.                     String tuple2Key = tuple2._1();
  70.                     String tuple2Value = tuple2._2();
  71.                     System.out.println(tuple2Key +" "+tuple2Value);
  72.                     if ("L".equals(tuple2Key)){
  73.                         user =tuple2Value;
  74.                     }else {
  75.                         trans.add(tuple2Value);
  76.                     }
  77.                 }

  78.                 List<Tuple2<String,String>> tuple2List = new ArrayList<Tuple2<String, String>>();
  79.                 for (String str:trans){
  80.                     tuple2List.add(new Tuple2<String, String>(str,user));
  81.                 }
  82.                 return tuple2List.iterator();
  83.             }
  84.         });

  85.         List<Tuple2<String,String>> list = userTransPairRDD.collect();
  86.         System.out.println("---------------flat map list-----------------------");
  87.         System.out.println("--- debug2 begin ---");
  88.         for (Tuple2<String,String> tuple2: list){
  89.             System.out.println(tuple2._1()+" "+ tuple2._2());
  90.         }
  91.         System.out.println("--- debug2 end ---");
  92.         JavaPairRDD<String, Iterable<String>> userTransGroupBy = userTransPairRDD.groupByKey();

  93.         // debug3
  94.         List<Tuple2<String, Iterable<String>>> debug3 = userTransGroupBy.collect();
  95.         System.out.println("--- debug3 begin ---");
  96.         for (Tuple2<String, Iterable<String>> t2 : debug3) {
  97.             System.out.println(t2._1+" "+ t2._2());
  98.         }
  99.         System.out.println("--- debug3 end ---");

  100.         JavaPairRDD<String,Tuple2<Set<String>, Integer>> userTransMapValues = userTransGroupBy.mapValues(new Function<Iterable<String>, Tuple2<Set<String>, Integer>>() {
  101.             @Override
  102.             public Tuple2<Set<String>, Integer> call(Iterable<String> v1) throws Exception {
  103.                 Set<String> set = new HashSet<String>();
  104.                 for (String str:v1){
  105.                     set.add(str);
  106.                 }
  107.                 return new Tuple2<Set<String>, Integer>(set,set.size());
  108.             }
  109.         }).sortByKey();


  110.         System.out.println("=== Unique Locations and Counts ===");
  111.         List<Tuple2<String, Tuple2<Set<String>, Integer>>> debug4 = userTransMapValues.collect();
  112.         System.out.println("--- debug4 begin ---");
  113.         for (Tuple2<String, Tuple2<Set<String>, Integer>> t2 : debug4) {
  114.             System.out.println(t2._1+" "+t2._2);
  115.         }
  116.         System.out.println("--- debug4 end ---");
  117.     }
方案二实现:

点击(此处)折叠或打开

  1. import org.apache.spark.SparkConf;
  2. import org.apache.spark.api.java.JavaPairRDD;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.Optional;
  6. import org.apache.spark.api.java.function.Function;
  7. import org.apache.spark.api.java.function.PairFunction;
  8. import scala.Tuple2;

  9. import java.io.Serializable;
  10. import java.util.Comparator;
  11. import java.util.HashSet;
  12. import java.util.List;
  13. import java.util.Set;


  14. public class LeftOuterJoin {
  15.     public static class compare implements Comparator<String>, Serializable{

  16.     public static final compare INSTANCE = new compare();
  17.         @Override
  18.         public int compare(String o1, String o2) {
  19.             System.out.println("--------------compare:"+ o1+" "+o2 +" "+-1*o1.compareTo(o2));
  20.             return -1*o1.compareTo(o2);
  21.         }
  22.     }
  23.     public static void main(String []args){
  24.         SparkConf conf = new SparkConf();
  25.         conf.setMaster("local").setAppName("LeftOuterJoin");

  26.         JavaSparkContext sc= new JavaSparkContext(conf);

  27.         JavaRDD<String> userLines = sc.textFile("E:\\tmp\\input\\user.txt");
  28.         JavaPairRDD<String, String> userPairRDD = userLines.mapToPair(new PairFunction<String, String, String>() {
  29.             @Override
  30.             public Tuple2<String, String> call(String s) throws Exception {
  31.                 String []splits = s.split("\t");
  32.                 return new Tuple2<String, String> (splits[0],splits[1]);
  33.             }
  34.         });

  35.         JavaRDD<String> transRDD = sc.textFile("E:\\tmp\\input\\trans.txt");

  36.         JavaPairRDD<String,String> transPairRDD = transRDD.mapToPair(new PairFunction<String, String, String>() {
  37.             @Override
  38.             public Tuple2<String, String> call(String s) throws Exception {
  39.                 String []splits = s.split("\t");
  40.                 return new Tuple2<String, String> (splits[2],splits[1]);
  41.             }
  42.         });

  43.        JavaPairRDD<String,Tuple2<String,Optional<String>>> joinRDD= transPairRDD.leftOuterJoin(userPairRDD);
  44.         List<Tuple2<String, Tuple2<String, Optional<String>>>> list =joinRDD.collect();
  45.         for (Tuple2<String, Tuple2<String, Optional<String>>> tuple2:list){
  46.             System.out.println(tuple2._1() +" "+ tuple2._2());
  47.         }

  48.         JavaPairRDD<String,String> userTransPairRDD = joinRDD.mapToPair(new PairFunction<Tuple2<String, Tuple2<String, Optional<String>>>, String, String>() {
  49.             @Override
  50.             public Tuple2<String, String> call(Tuple2<String, Tuple2<String, Optional<String>>> stringTuple2Tuple2) throws Exception {
  51.                 return new Tuple2<String, String>(stringTuple2Tuple2._2()._1(), stringTuple2Tuple2._2()._2().get());
  52.             }
  53.         });

  54.         List<Tuple2<String, String>> pairList = userTransPairRDD.collect();
  55.         System.out.println("-------------debug-------------");
  56.         for (Tuple2<String,String> tuple2: pairList){
  57.             System.out.println(tuple2._1() +" "+ tuple2._2());
  58.         }

  59.        JavaPairRDD<String, Iterable<String>> groupByPairRDD =userTransPairRDD.groupByKey();

  60.         List<Tuple2<String, Iterable<String>>> groupByList = groupByPairRDD.collect();
  61.         System.out.println("-------------debug-------------");
  62.         for (Tuple2<String,Iterable<String>> tuple2: groupByList){
  63.             System.out.println(tuple2._1() +" "+ tuple2._2());
  64.         }

  65.         JavaPairRDD<String, Tuple2<Set<String>, Integer>> setPairRDD= groupByPairRDD.mapValues(new Function<Iterable<String>, Tuple2<Set<String>, Integer>>() {
  66.              @Override
  67.              public Tuple2<Set<String>, Integer> call(Iterable<String> v1) throws Exception {
  68.                  Set<String> set = new HashSet<String>();

  69.                  for (String str: v1){
  70.                      set.add(str);
  71.                  }
  72.                  return new Tuple2<Set<String>, Integer>(set, set.size());
  73.              }
  74.          });
  75. }


阅读(4543) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~