假设有如下数据:
100,200 300 400 500 600
200,100 300 400
300,100 200 400 500
400,100 200 300
500,100 300
600,100
逗号前表示用户,逗号后表示好友列表,用户100和200的共同好友是300和400
步骤:
1、获取以<用户,好友>为key,好友列表为value的键值对
(100,200) [200, 300, 400, 500, 600]
(100,300) [200, 300, 400, 500, 600]
(100,400) [200, 300, 400, 500, 600]
(100,500) [200, 300, 400, 500, 600]
(100,600) [200, 300, 400, 500, 600]
(100,200) [100, 300, 400]
(200,300) [100, 300, 400]
(200,400) [100, 300, 400]
(100,300) [100, 200, 400, 500]
(200,300) [100, 200, 400, 500]
(300,400) [100, 200, 400, 500]
(300,500) [100, 200, 400, 500]
(100,400) [100, 200, 300]
(200,400) [100, 200, 300]
(300,400) [100, 200, 300]
(100,500) [100, 300]
(300,500) [100, 300]
(100,600) []
2、对key进行group,获取同一个key的好友列表
(300,400) [[100, 200, 400, 500], [100, 200, 300]]
(100,200) [[200, 300, 400, 500, 600], [100, 300, 400]]
(300,500) [[100, 200, 400, 500], [100, 300]]
(100,500) [[200, 300, 400, 500, 600], [100, 300]]
(200,300) [[100, 300, 400], [100, 200, 400, 500]]
(100,600) [[200, 300, 400, 500, 600], []]
(100,300) [[200, 300, 400, 500, 600], [100, 200, 400, 500]]
(200,400) [[100, 300, 400], [100, 200, 300]]
(100,400) [[200, 300, 400, 500, 600], [100, 200, 300]]
3、查找共同好友列表
(300,400) [100, 200]
(100,200) [400, 300]
(300,500) [100]
(100,500) [300]
(200,300) [400, 100]
(100,600) []
(100,300) [400, 500, 200]
(200,400) [100, 300]
(100,400) [200, 300]
具体实现如下:
-
import org.apache.spark.SparkConf;
-
import org.apache.spark.api.java.JavaPairRDD;
-
import org.apache.spark.api.java.JavaRDD;
-
import org.apache.spark.api.java.JavaSparkContext;
-
import org.apache.spark.api.java.function.Function;
-
import org.apache.spark.api.java.function.PairFlatMapFunction;
-
import scala.Tuple2;
-
-
import java.util.*;
-
-
public class FindCommonFriends {
-
public static Tuple2<Long,Long> buildSortKey(Long person, Long friend){
-
if(person>friend){
-
return new Tuple2<>(friend, person);
-
}else {
-
return new Tuple2<>(person,friend);
-
}
-
}
-
-
public static List<Long> iterableToList(Iterable<Long> iterable) {
-
List<Long> list = new ArrayList<Long>();
-
for (Long item : iterable) {
-
list.add(item);
-
}
-
return list;
-
}
-
-
public static void main(final String []args){
-
SparkConf conf = new SparkConf();
-
conf.setMaster("local").setAppName("FindCommonFriends");
-
JavaSparkContext sc = new JavaSparkContext(conf);
-
final JavaRDD<String> lines = sc.textFile("E:\\tmp\\input\\commonfriend.txt");
-
-
JavaPairRDD<Tuple2<Long,Long>,Iterable<Long>> pairRDD = lines.flatMapToPair(new PairFlatMapFunction<String, Tuple2<Long, Long>, Iterable<Long>>() {
-
@Override
-
public Iterator<Tuple2<Tuple2<Long, Long>, Iterable<Long>>> call(String s) throws Exception {
-
String []arrays = s.split(",");
-
//获取用户
-
Long person = Long.parseLong(arrays[0]);
-
//获取用户好友
-
String []friends = arrays[1].split(" ");
-
List<Tuple2<Tuple2<Long, Long>, Iterable<Long>>> resultList = new ArrayList<Tuple2<Tuple2<Long, Long>, Iterable<Long>>>();
-
List<Long> friendList = new ArrayList<Long>();
-
if(friends.length==1){// 只有1个用户
-
Tuple2<Long,Long> key = FindCommonFriends.buildSortKey(person, Long.parseLong(friends[0]));
-
resultList.add(new Tuple2<Tuple2<Long, Long>, Iterable<Long>>(key, new ArrayList<Long>() ));
-
return resultList.iterator();
-
}
-
-
for (String strFriend:friends){
-
Long friend = Long.parseLong(strFriend);
-
friendList.add(friend);
-
}
-
-
//构建<用户,好友> 为key,用户好友列表为value的键值对
-
for (Long friend: friendList){
-
//对key排序,并返回
-
Tuple2<Long,Long> key = FindCommonFriends.buildSortKey(person,friend);
-
resultList.add(new Tuple2<Tuple2<Long, Long>, Iterable<Long>>(key,friendList));
-
}
-
return resultList.iterator();
-
}
-
});
-
List<Tuple2<Tuple2<Long,Long>,Iterable<Long>>> list = pairRDD.collect();
-
System.out.println("----------------user friend-----------------");
-
for (Tuple2<Tuple2<Long,Long>,Iterable<Long>> tuple2:list){
-
System.out.println(tuple2._1() +" "+ tuple2._2());
-
}
-
-
//对key进行group
-
JavaPairRDD<Tuple2<Long,Long>,Iterable<Iterable<Long>>> groupPairRDD = pairRDD.groupByKey();
-
List<Tuple2<Tuple2<Long,Long>,Iterable<Iterable<Long>>>> grouplist = groupPairRDD.collect();
-
System.out.println("----------------user friend group-----------------");
-
for (Tuple2<Tuple2<Long,Long>,Iterable<Iterable<Long>>> tuple2:grouplist){
-
System.out.println(tuple2._1() +" "+ tuple2._2());
-
}
-
-
//获取共同的好友
-
JavaPairRDD<Tuple2<Long, Long>, Iterable<Long>> commonFriend= groupPairRDD.mapValues(new Function<Iterable<Iterable<Long>>, Iterable<Long>>() {
-
@Override
-
public Iterable<Long> call(Iterable<Iterable<Long>> allFriendList) throws Exception {
-
int allFriendListSize=0;//好友列表个数据
-
Map<Long, Integer> map = new HashMap<Long, Integer>();//统计好友在同一key中出现的次数
-
for (Iterable<Long> singleFriendList:allFriendList){
-
allFriendListSize++;
-
List<Long> list = FindCommonFriends.iterableToList(singleFriendList);
-
if(list==null||list.size()==0){
-
continue;
-
}
-
-
for (Long friend:list){
-
Integer count=map.get(friend);
-
if(count==null){
-
map.put(friend,1);
-
}else {
-
map.put(friend,++count);
-
}
-
}
-
}
-
-
List<Long> commonList = new ArrayList<Long>();
-
for (Map.Entry<Long, Integer> entry: map.entrySet()){
-
//如果好友在好友列表中出现的次数与 好友列表数相同,则表明是共同好友
-
if(entry.getValue() == allFriendListSize){
-
commonList.add(entry.getKey());
-
}
-
}
-
return commonList;
-
}
-
});
-
-
List<Tuple2<Tuple2<Long, Long>, Iterable<Long>>> commonFriendList = commonFriend.collect();
-
System.out.println("----------------common friend list-----------------");
-
for (Tuple2<Tuple2<Long, Long>, Iterable<Long>> tuple2:commonFriendList){
-
System.out.println(tuple2._1() +" "+ tuple2._2());
-
}
-
}
-
}