假设有如下数据:
100,200,300,400,500,600
200,100,300,400
300,100,200,400,500
400,100,200,300
500,100,300
600,100
其中第一列表示用户ID,其余列表示用户的好友ID,我们需要统计出这些用户的共同好友
其结果如下:
[100,200]
[300, 400] //表示用户100和200的共同好友是 300,400
[100,300]
[200, 400, 500]
[100,400]
[200, 300]
[100,500]
[300]
[100,600]
[]
[200,300]
[100, 400]
[200,400]
[100, 300]
[300,400]
[100, 200]
[300,500]
[100]
实现原理如下:
1、在Mapper阶段将用户ID和好友ID作为新的Key,好友列表作为value,发送到Reducer中,这样的相同的Key发送到同一reduce中
2、Reducer阶段对,对这些value取交集,就可以得到共同好友列表,将这些好友列表输出即可。
具体实现如下:
-
import org.apache.hadoop.fs.FileSystem;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.LongWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-
import java.io.IOException;
-
import java.util.ArrayList;
-
import java.util.HashMap;
-
import java.util.List;
-
import java.util.Map;
-
-
public class CommonFriendsJob {
-
/*
-
* map input
-
100,200,300,400,500,600
-
200,100,300,400
-
300,100,200,400,500
-
400,100,200,300
-
500,100,300
-
600,100
-
* */
-
public static class CommonFriendMapper extends Mapper<LongWritable, Text, Text,Text>{
-
private static final Text outKey = new Text();
-
private static final Text outValue = new Text();
-
@Override
-
protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
-
String strValue = value.toString();
-
System.out.println("map input:"+ strValue);
-
String[] arrays = strValue.split(",");
-
String person = arrays[0];//第一个元素表示用户,其余的表示好友
-
String friends =strValue.substring(strValue.indexOf(",")+1);
-
for (int i=1;i<arrays.length;i++){
-
String strOutKey = "";
-
String friend = arrays[i];
-
//避免两个值相同但顺序不同的情况下发送到不同的reduce中
-
if(Integer.parseInt(person)<Integer.parseInt(friend)){
-
strOutKey="["+person+","+ friend+"]";
-
}else {
-
strOutKey="["+ friend +","+ person+"]";
-
}
-
outKey.set(strOutKey);
-
outValue.set(friends);
-
System.out.println("map out, key:"+strOutKey+" value:"+friends);
-
context.write(outKey, outValue);
-
}
-
}
-
}
-
-
/*
-
reduce out:
-
[100,200] [300, 400]
-
[100,300] [200, 400, 500]
-
[100,400] [200, 300]
-
[100,500] [300]
-
[100,600] []
-
[200,300] [100, 400]
-
[200,400] [100, 300]
-
[300,400] [100, 200]
-
[300,500] [100]
-
* */
-
public static class CommonFriendReducer extends Reducer<Text, Text, Text, Text>{
-
@Override
-
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-
int num=0;
-
Map<String, Integer> countMap = new HashMap<String,Integer>();
-
for (Text value: values){
-
System.out.println("reduce input,key:"+key.toString() +"value:"+value.toString());
-
String[] friends = value.toString().split(",");
-
for (String friend:friends){//用来统计共同好友个数
-
if(countMap.get(friend)==null){
-
countMap.put(friend, 1);
-
}else {
-
countMap.put(friend,countMap.get(friend)+1);
-
}
-
}
-
num++;
-
}
-
-
List<String> commonFriendList= new ArrayList<String>();
-
for (Map.Entry<String,Integer> entry: countMap.entrySet()){
-
if(entry.getValue()==num && num!=1 ){//如果Key的个数与好友个数相同,那么是共同好友
-
commonFriendList.add(entry.getKey());
-
}
-
}
-
-
context.write(key, new Text(commonFriendList.toString()));
-
}
-
}
-
-
-
public static void main(String []args){
-
-
try {
-
Job job = Job.getInstance();
-
job.setJobName("CommonFriendsJob");
-
job.setJarByClass(CommonFriendsJob.class);
-
-
job.setMapperClass(CommonFriendMapper.class);
-
job.setMapOutputKeyClass(Text.class);
-
job.setMapOutputValueClass(Text.class);
-
-
job.setReducerClass(CommonFriendReducer.class);
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(Text.class);
-
-
job.setNumReduceTasks(1);
-
-
FileInputFormat.setInputPaths(job, new Path(args[0]));
-
FileOutputFormat.setOutputPath(job,new Path(args[1]));
-
-
FileSystem.get(job.getConfiguration()).delete(new Path(args[1]),true);
-
-
System.out.println(job.waitForCompletion(true));
-
} catch (IOException e) {
-
e.printStackTrace();
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} catch (ClassNotFoundException e) {
-
e.printStackTrace();
-
}
-
-
}
阅读(1209) | 评论(0) | 转发(0) |