Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1080167
  • 博文数量: 143
  • 博客积分: 969
  • 博客等级: 准尉
  • 技术积分: 1765
  • 用 户 组: 普通用户
  • 注册时间: 2011-07-30 12:09
文章分类

全部博文(143)

文章存档

2023年(4)

2021年(2)

2020年(4)

2019年(4)

2018年(33)

2017年(6)

2016年(13)

2014年(7)

2013年(23)

2012年(33)

2011年(14)

我的朋友

分类: 大数据

2018-11-15 20:59:44

假设有如下数据:
100,200 300 400 500 600
200,100 300 400
300,100 200 400 500
400,100 200 300
500,100 300
600,100
逗号前表示用户,逗号后表示好友列表,用户100和200的共同好友是300和400
步骤:
1、获取以<用户,好友>为key,好友列表为value的键值对
(100,200) [200, 300, 400, 500, 600]
(100,300) [200, 300, 400, 500, 600]
(100,400) [200, 300, 400, 500, 600]
(100,500) [200, 300, 400, 500, 600]
(100,600) [200, 300, 400, 500, 600]
(100,200) [100, 300, 400]
(200,300) [100, 300, 400]
(200,400) [100, 300, 400]
(100,300) [100, 200, 400, 500]
(200,300) [100, 200, 400, 500]
(300,400) [100, 200, 400, 500]
(300,500) [100, 200, 400, 500]
(100,400) [100, 200, 300]
(200,400) [100, 200, 300]
(300,400) [100, 200, 300]
(100,500) [100, 300]
(300,500) [100, 300]
(100,600) []
2、对key进行group,获取同一个key的好友列表
(300,400) [[100, 200, 400, 500], [100, 200, 300]]
(100,200) [[200, 300, 400, 500, 600], [100, 300, 400]]
(300,500) [[100, 200, 400, 500], [100, 300]]
(100,500) [[200, 300, 400, 500, 600], [100, 300]]
(200,300) [[100, 300, 400], [100, 200, 400, 500]]
(100,600) [[200, 300, 400, 500, 600], []]
(100,300) [[200, 300, 400, 500, 600], [100, 200, 400, 500]]
(200,400) [[100, 300, 400], [100, 200, 300]]
(100,400) [[200, 300, 400, 500, 600], [100, 200, 300]]
3、查找共同好友列表
(300,400) [100, 200]
(100,200) [400, 300]
(300,500) [100]
(100,500) [300]
(200,300) [400, 100]
(100,600) []
(100,300) [400, 500, 200]
(200,400) [100, 300]
(100,400) [200, 300]

具体实现如下:

点击(此处)折叠或打开

  1. import org.apache.spark.SparkConf;
  2. import org.apache.spark.api.java.JavaPairRDD;
  3. import org.apache.spark.api.java.JavaRDD;
  4. import org.apache.spark.api.java.JavaSparkContext;
  5. import org.apache.spark.api.java.function.Function;
  6. import org.apache.spark.api.java.function.PairFlatMapFunction;
  7. import scala.Tuple2;

  8. import java.util.*;

  9. public class FindCommonFriends {
  10.     public static Tuple2<Long,Long> buildSortKey(Long person, Long friend){
  11.         if(person>friend){
  12.             return new Tuple2<>(friend, person);
  13.         }else {
  14.             return new Tuple2<>(person,friend);
  15.         }
  16.     }

  17.     public static List<Long> iterableToList(Iterable<Long> iterable) {
  18.         List<Long> list = new ArrayList<Long>();
  19.         for (Long item : iterable) {
  20.             list.add(item);
  21.         }
  22.         return list;
  23.     }

  24.     public static void main(final String []args){
  25.         SparkConf conf = new SparkConf();
  26.         conf.setMaster("local").setAppName("FindCommonFriends");
  27.         JavaSparkContext sc = new JavaSparkContext(conf);
  28.         final JavaRDD<String> lines = sc.textFile("E:\\tmp\\input\\commonfriend.txt");

  29.         JavaPairRDD<Tuple2<Long,Long>,Iterable<Long>> pairRDD = lines.flatMapToPair(new PairFlatMapFunction<String, Tuple2<Long, Long>, Iterable<Long>>() {
  30.             @Override
  31.             public Iterator<Tuple2<Tuple2<Long, Long>, Iterable<Long>>> call(String s) throws Exception {
  32.                 String []arrays = s.split(",");
  33.                 //获取用户
  34.                 Long person = Long.parseLong(arrays[0]);
  35.                 //获取用户好友
  36.                 String []friends = arrays[1].split(" ");
  37.                 List<Tuple2<Tuple2<Long, Long>, Iterable<Long>>> resultList = new ArrayList<Tuple2<Tuple2<Long, Long>, Iterable<Long>>>();
  38.                 List<Long> friendList = new ArrayList<Long>();
  39.                 if(friends.length==1){// 只有1个用户
  40.                     Tuple2<Long,Long> key = FindCommonFriends.buildSortKey(person, Long.parseLong(friends[0]));
  41.                      resultList.add(new Tuple2<Tuple2<Long, Long>, Iterable<Long>>(key, new ArrayList<Long>() ));
  42.                     return resultList.iterator();
  43.                 }

  44.                 for (String strFriend:friends){
  45.                     Long friend = Long.parseLong(strFriend);
  46.                     friendList.add(friend);
  47.                 }

  48.                 //构建<用户,好友> 为key,用户好友列表为value的键值对
  49.                 for (Long friend: friendList){
  50.                     //对key排序,并返回
  51.                     Tuple2<Long,Long> key = FindCommonFriends.buildSortKey(person,friend);
  52.                     resultList.add(new Tuple2<Tuple2<Long, Long>, Iterable<Long>>(key,friendList));
  53.                 }
  54.                 return resultList.iterator();
  55.             }
  56.         });
  57.         List<Tuple2<Tuple2<Long,Long>,Iterable<Long>>> list = pairRDD.collect();
  58.         System.out.println("----------------user friend-----------------");
  59.         for (Tuple2<Tuple2<Long,Long>,Iterable<Long>> tuple2:list){
  60.             System.out.println(tuple2._1() +" "+ tuple2._2());
  61.         }

  62.         //对key进行group
  63.         JavaPairRDD<Tuple2<Long,Long>,Iterable<Iterable<Long>>> groupPairRDD = pairRDD.groupByKey();
  64.         List<Tuple2<Tuple2<Long,Long>,Iterable<Iterable<Long>>>> grouplist = groupPairRDD.collect();
  65.         System.out.println("----------------user friend group-----------------");
  66.         for (Tuple2<Tuple2<Long,Long>,Iterable<Iterable<Long>>> tuple2:grouplist){
  67.             System.out.println(tuple2._1() +" "+ tuple2._2());
  68.         }

  69.         //获取共同的好友
  70.         JavaPairRDD<Tuple2<Long, Long>, Iterable<Long>> commonFriend= groupPairRDD.mapValues(new Function<Iterable<Iterable<Long>>, Iterable<Long>>() {
  71.             @Override
  72.             public Iterable<Long> call(Iterable<Iterable<Long>> allFriendList) throws Exception {
  73.                 int allFriendListSize=0;//好友列表个数据
  74.                 Map<Long, Integer> map = new HashMap<Long, Integer>();//统计好友在同一key中出现的次数
  75.                 for (Iterable<Long> singleFriendList:allFriendList){
  76.                     allFriendListSize++;
  77.                     List<Long> list = FindCommonFriends.iterableToList(singleFriendList);
  78.                     if(list==null||list.size()==0){
  79.                         continue;
  80.                     }

  81.                     for (Long friend:list){
  82.                         Integer count=map.get(friend);
  83.                         if(count==null){
  84.                             map.put(friend,1);
  85.                         }else {
  86.                             map.put(friend,++count);
  87.                         }
  88.                     }
  89.                 }

  90.                 List<Long> commonList = new ArrayList<Long>();
  91.                 for (Map.Entry<Long, Integer> entry: map.entrySet()){
  92.                     //如果好友在好友列表中出现的次数与 好友列表数相同,则表明是共同好友
  93.                     if(entry.getValue() == allFriendListSize){
  94.                         commonList.add(entry.getKey());
  95.                     }
  96.                 }
  97.                 return commonList;
  98.             }
  99.         });
  100.         
  101.         List<Tuple2<Tuple2<Long, Long>, Iterable<Long>>> commonFriendList = commonFriend.collect();
  102.         System.out.println("----------------common friend list-----------------");
  103.         for (Tuple2<Tuple2<Long, Long>, Iterable<Long>> tuple2:commonFriendList){
  104.             System.out.println(tuple2._1() +" "+ tuple2._2());
  105.         }
  106.     }
  107. }

阅读(4911) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~