Spark连接有2种方案
方案一:
使用spark自带的union函数,union函数要求连接的两个RDD类型必须一致。具体思路与MapReduce实现jion一致,即对两个数据源分别打标签A,B,然后在reduce中,把key相同的值连接起来。最后在按业务要求对数据进行处理
方案二:
使用spark提供的leftOuterJoin函数,对两个RDD进行做外链接,然后对数据进行处理。
相比而言,方案二更简洁些,建议采用方案二
方案一实现:
-
import org.apache.spark.SparkConf;
-
import org.apache.spark.api.java.JavaPairRDD;
-
import org.apache.spark.api.java.JavaRDD;
-
import org.apache.spark.api.java.JavaSparkContext;
-
import org.apache.spark.api.java.function.Function;
-
import org.apache.spark.api.java.function.PairFlatMapFunction;
-
import org.apache.spark.api.java.function.PairFunction;
-
import scala.Tuple2;
-
-
import java.util.*;
-
-
public class LeftJoin {
-
public static void main(String []args){
-
SparkConf conf = new SparkConf();
-
conf.setMaster("local").setAppName("LeftJoin");
-
JavaSparkContext sc = new JavaSparkContext(conf);
-
JavaRDD<String> userLines =sc.textFile("E:\\tmp\\input\\user.txt");
-
-
JavaPairRDD<String, Tuple2<String, String>> userPairRDD = userLines.mapToPair(new PairFunction<String, String, Tuple2<String, String>>() {
-
@Override
-
public Tuple2<String, Tuple2<String, String>> call(String s) throws Exception {
-
String []split = s.split("\t");
-
Tuple2<String,String> tuple2 = new Tuple2<String, String>("L",split[1]);
-
return new Tuple2<String, Tuple2<String, String>>(split[0], tuple2);
-
}
-
});
-
-
List<Tuple2<String, Tuple2<String, String>>> userList = userPairRDD.collect();
-
System.out.println("---------------user list-----------------------");
-
for (Tuple2<String, Tuple2<String, String>> tuple2:userList){
-
System.out.println(tuple2._1() +" "+ tuple2._2()._1() + " "+ tuple2._2()._2());
-
-
}
-
-
JavaRDD<String> transRDD = sc.textFile("E:\\tmp\\input\\trans.txt");
-
-
JavaPairRDD<String,Tuple2<String, String>> transPairRDD = transRDD.mapToPair(new PairFunction<String, String, Tuple2<String, String>>() {
-
@Override
-
public Tuple2<String, Tuple2<String, String>> call(String s) throws Exception {
-
String []split = s.split("\t");
-
Tuple2<String, String> tuple2 = new Tuple2<String, String>("P",split[1]);
-
return new Tuple2<String, Tuple2<String, String>>(split[2], tuple2);
-
}
-
});
-
-
List<Tuple2<String, Tuple2<String, String>>> transList = transPairRDD.collect();
-
System.out.println("---------------trans list-----------------------");
-
for (Tuple2<String, Tuple2<String, String>> tuple2:transList){
-
System.out.println(tuple2._1() +" "+ tuple2._2()._1() + " "+ tuple2._2()._2());
-
-
}
-
-
JavaPairRDD<String,Tuple2<String,String>> allPairRDD = transPairRDD.union(userPairRDD);
-
-
List<Tuple2<String, Tuple2<String, String>>> allPairList = allPairRDD.collect();
-
System.out.println("---------------all list-----------------------");
-
for (Tuple2<String, Tuple2<String, String>> tuple2:allPairList){
-
System.out.println(tuple2._1() +" "+ tuple2._2()._1() + " "+ tuple2._2()._2());
-
}
-
-
JavaPairRDD<String,Iterable<Tuple2<String,String>>> groupPairRDD = allPairRDD.groupByKey();
-
List<Tuple2<String, Iterable<Tuple2<String, String>>>> groupList= groupPairRDD.collect();
-
System.out.println("---------------group list-----------------------");
-
for (Tuple2<String, Iterable<Tuple2<String, String>>> tuple2: groupList){
-
String key = tuple2._1();
-
System.out.println(key +" "+ tuple2._2());
-
// System.out.println("-------"+key+"-------");
-
// Iterable> iterable =tuple2._2();
-
// for (Tuple2 tuple21:iterable){
-
// System.out.println(tuple21._1()+" "+ tuple21._2());
-
// }
-
}
-
-
JavaPairRDD<String,String> userTransPairRDD = groupPairRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Iterable<Tuple2<String, String>>>, String, String>() {
-
@Override
-
public Iterator<Tuple2<String, String>> call(Tuple2<String, Iterable<Tuple2<String, String>>> iterableTuple2) throws Exception {
-
String key = iterableTuple2._1();
-
Iterable<Tuple2<String, String>> values = iterableTuple2._2();
-
String user="UNKNOWN";
-
List<String> trans = new ArrayList<String>();
-
for (Tuple2<String, String> tuple2: values){
-
String tuple2Key = tuple2._1();
-
String tuple2Value = tuple2._2();
-
System.out.println(tuple2Key +" "+tuple2Value);
-
if ("L".equals(tuple2Key)){
-
user =tuple2Value;
-
}else {
-
trans.add(tuple2Value);
-
}
-
}
-
-
List<Tuple2<String,String>> tuple2List = new ArrayList<Tuple2<String, String>>();
-
for (String str:trans){
-
tuple2List.add(new Tuple2<String, String>(str,user));
-
}
-
return tuple2List.iterator();
-
}
-
});
-
-
List<Tuple2<String,String>> list = userTransPairRDD.collect();
-
System.out.println("---------------flat map list-----------------------");
-
System.out.println("--- debug2 begin ---");
-
for (Tuple2<String,String> tuple2: list){
-
System.out.println(tuple2._1()+" "+ tuple2._2());
-
}
-
System.out.println("--- debug2 end ---");
-
JavaPairRDD<String, Iterable<String>> userTransGroupBy = userTransPairRDD.groupByKey();
-
-
// debug3
-
List<Tuple2<String, Iterable<String>>> debug3 = userTransGroupBy.collect();
-
System.out.println("--- debug3 begin ---");
-
for (Tuple2<String, Iterable<String>> t2 : debug3) {
-
System.out.println(t2._1+" "+ t2._2());
-
}
-
System.out.println("--- debug3 end ---");
-
-
JavaPairRDD<String,Tuple2<Set<String>, Integer>> userTransMapValues = userTransGroupBy.mapValues(new Function<Iterable<String>, Tuple2<Set<String>, Integer>>() {
-
@Override
-
public Tuple2<Set<String>, Integer> call(Iterable<String> v1) throws Exception {
-
Set<String> set = new HashSet<String>();
-
for (String str:v1){
-
set.add(str);
-
}
-
return new Tuple2<Set<String>, Integer>(set,set.size());
-
}
-
}).sortByKey();
-
-
-
System.out.println("=== Unique Locations and Counts ===");
-
List<Tuple2<String, Tuple2<Set<String>, Integer>>> debug4 = userTransMapValues.collect();
-
System.out.println("--- debug4 begin ---");
-
for (Tuple2<String, Tuple2<Set<String>, Integer>> t2 : debug4) {
-
System.out.println(t2._1+" "+t2._2);
-
}
-
System.out.println("--- debug4 end ---");
-
}
方案二实现:
-
import org.apache.spark.SparkConf;
-
import org.apache.spark.api.java.JavaPairRDD;
-
import org.apache.spark.api.java.JavaRDD;
-
import org.apache.spark.api.java.JavaSparkContext;
-
import org.apache.spark.api.java.Optional;
-
import org.apache.spark.api.java.function.Function;
-
import org.apache.spark.api.java.function.PairFunction;
-
import scala.Tuple2;
-
-
import java.io.Serializable;
-
import java.util.Comparator;
-
import java.util.HashSet;
-
import java.util.List;
-
import java.util.Set;
-
-
-
public class LeftOuterJoin {
-
public static class compare implements Comparator<String>, Serializable{
-
-
public static final compare INSTANCE = new compare();
-
@Override
-
public int compare(String o1, String o2) {
-
System.out.println("--------------compare:"+ o1+" "+o2 +" "+-1*o1.compareTo(o2));
-
return -1*o1.compareTo(o2);
-
}
-
}
-
public static void main(String []args){
-
SparkConf conf = new SparkConf();
-
conf.setMaster("local").setAppName("LeftOuterJoin");
-
-
JavaSparkContext sc= new JavaSparkContext(conf);
-
-
JavaRDD<String> userLines = sc.textFile("E:\\tmp\\input\\user.txt");
-
JavaPairRDD<String, String> userPairRDD = userLines.mapToPair(new PairFunction<String, String, String>() {
-
@Override
-
public Tuple2<String, String> call(String s) throws Exception {
-
String []splits = s.split("\t");
-
return new Tuple2<String, String> (splits[0],splits[1]);
-
}
-
});
-
-
JavaRDD<String> transRDD = sc.textFile("E:\\tmp\\input\\trans.txt");
-
-
JavaPairRDD<String,String> transPairRDD = transRDD.mapToPair(new PairFunction<String, String, String>() {
-
@Override
-
public Tuple2<String, String> call(String s) throws Exception {
-
String []splits = s.split("\t");
-
return new Tuple2<String, String> (splits[2],splits[1]);
-
}
-
});
-
-
JavaPairRDD<String,Tuple2<String,Optional<String>>> joinRDD= transPairRDD.leftOuterJoin(userPairRDD);
-
List<Tuple2<String, Tuple2<String, Optional<String>>>> list =joinRDD.collect();
-
for (Tuple2<String, Tuple2<String, Optional<String>>> tuple2:list){
-
System.out.println(tuple2._1() +" "+ tuple2._2());
-
}
-
-
JavaPairRDD<String,String> userTransPairRDD = joinRDD.mapToPair(new PairFunction<Tuple2<String, Tuple2<String, Optional<String>>>, String, String>() {
-
@Override
-
public Tuple2<String, String> call(Tuple2<String, Tuple2<String, Optional<String>>> stringTuple2Tuple2) throws Exception {
-
return new Tuple2<String, String>(stringTuple2Tuple2._2()._1(), stringTuple2Tuple2._2()._2().get());
-
}
-
});
-
-
List<Tuple2<String, String>> pairList = userTransPairRDD.collect();
-
System.out.println("-------------debug-------------");
-
for (Tuple2<String,String> tuple2: pairList){
-
System.out.println(tuple2._1() +" "+ tuple2._2());
-
}
-
-
JavaPairRDD<String, Iterable<String>> groupByPairRDD =userTransPairRDD.groupByKey();
-
-
List<Tuple2<String, Iterable<String>>> groupByList = groupByPairRDD.collect();
-
System.out.println("-------------debug-------------");
-
for (Tuple2<String,Iterable<String>> tuple2: groupByList){
-
System.out.println(tuple2._1() +" "+ tuple2._2());
-
}
-
-
JavaPairRDD<String, Tuple2<Set<String>, Integer>> setPairRDD= groupByPairRDD.mapValues(new Function<Iterable<String>, Tuple2<Set<String>, Integer>>() {
-
@Override
-
public Tuple2<Set<String>, Integer> call(Iterable<String> v1) throws Exception {
-
Set<String> set = new HashSet<String>();
-
-
for (String str: v1){
-
set.add(str);
-
}
-
return new Tuple2<Set<String>, Integer>(set, set.size());
-
}
-
});
-
}
阅读(4563) | 评论(0) | 转发(0) |