-
package com.hdp.design;
-
-
import org.apache.hadoop.fs.FileSystem;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.IntWritable;
-
import org.apache.hadoop.io.MapWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.io.Writable;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-
import java.io.IOException;
-
import java.util.*;
-
-
public class MyStripesJob {
-
-
/*
-
输入数据为用户购买的商品ID列表,具体如下
-
2,3,1,4,5,2,3
-
1,2,5,2
-
4,5
-
1,3,4,1
-
3,1
-
4,2,1,5,5,3
-
map input:2,3,1,4,5,2,3
-
1,[2,2]
-
1,[3,2]
-
1,[4,1]
-
1,[5,1]
-
2,[3,2]
-
2,[4,1]
-
2,[5,1]
-
2,[3,2]
-
2,[4,1]
-
2,[5,1]
-
3,[4,1]
-
3,[5,1]
-
3,[4,1]
-
3,[5,1]
-
4,[5,1]
-
map input:1,2,5,2
-
1,[2,2]
-
1,[5,1]
-
2,[5,1]
-
2,[5,1]
-
map input:4,5
-
4,[5,1]
-
map input:1,3,4,1
-
1,[3,1]
-
1,[4,1]
-
1,[3,1]
-
1,[4,1]
-
3,[4,1]
-
map input:3,1
-
1,[3,1]
-
map input:4,2,1,5,5,3
-
1,[2,1]
-
1,[3,1]
-
1,[4,1]
-
1,[5,2]
-
2,[3,1]
-
2,[4,1]
-
2,[5,2]
-
3,[4,1]
-
3,[5,2]
-
4,[5,2]
-
*/
-
public static class MyStripesMapper extends Mapper<Object, Text, Text, MapWritable>{
-
@Override
-
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
-
String []arrays = value.toString().split(",");
-
System.out.println("map input:"+ value.toString());
-
-
//对输入数据排序,避免将同时购买商品[1,2]和[2,1]分别计数
-
List<String> list = Arrays.asList(arrays);
-
Collections.sort(list);
-
for (int i=0;i<list.size();i++){
-
MapWritable mapWritable = new MapWritable();
-
String curPrdId = list.get(i);
-
for (int j=i+1;j<list.size();j++){
-
String nextPrdId = list.get(j);
-
Text prdIdKey = new Text(nextPrdId);
-
if(curPrdId.equals(nextPrdId)){//过滤相同的商品ID
-
continue;
-
}
-
-
//对购买商品ID次数累计
-
if(mapWritable.get(prdIdKey) ==null){
-
mapWritable.put(prdIdKey, new IntWritable(1));
-
}else {
-
IntWritable cnt = (IntWritable) mapWritable.get(prdIdKey);
-
cnt.set(cnt.get()+1);
-
mapWritable.put(prdIdKey,cnt);
-
}
-
}
-
-
if(mapWritable.size()>0){
-
System.out.println("map out:");
-
for (MapWritable.Entry tmpMap:mapWritable.entrySet()){
-
System.out.println(curPrdId+",["+tmpMap.getKey().toString() +","+ tmpMap.getValue()+"]");
-
}
-
context.write(new Text(curPrdId), mapWritable);
-
}
-
}
-
}
-
}
-
-
/*
-
reduce 输出
-
[1,2] 5
-
[1,3] 6
-
[1,4] 4
-
[1,5] 4
-
[2,3] 5
-
[2,4] 3
-
[2,5] 6
-
[3,4] 4
-
[3,5] 4
-
[4,5] 4
-
*
-
* */
-
public static class MyStripesReduce extends Reducer<Text, MapWritable, Text,Text>{
-
@Override
-
protected void reduce(Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException {
-
-
System.out.println("reduce input:"+key+","+ values.toString());
-
HashMap<String, Integer> map = new HashMap<String, Integer>();
-
for (MapWritable value: values){
-
for (Writable mapKey:value.keySet()){
-
String strKey = mapKey.toString();
-
//对收到的数据在次进行累加
-
int num = Integer.parseInt(value.get(mapKey).toString());
-
if(map.get(strKey)==null){
-
map.put(strKey,num);
-
}else {
-
map.put(strKey, (Integer)map.get(strKey)+num);
-
}
-
}
-
}
-
-
for (Map.Entry<String, Integer> tmpMap : map.entrySet()){
-
String outKey = "["+key.toString()+","+tmpMap.getKey()+"]";
-
System.out.println("reduce out:"+ outKey+ ","+ String.valueOf(tmpMap.getValue()));
-
context.write(new Text(outKey), new Text( String.valueOf(tmpMap.getValue())));
-
}
-
-
}
-
}
-
-
public static void main(String []args){
-
-
try {
-
Job job = Job.getInstance();
-
job.setJobName("MyStripesJob");
-
job.setJarByClass(MyStripesJob.class);
-
job.setMapperClass(MyStripesMapper.class);
-
job.setMapOutputKeyClass(Text.class);
-
job.setMapOutputValueClass(MapWritable.class);
-
-
job.setReducerClass(MyStripesReduce.class);
-
job.setOutputValueClass(Text.class);
-
job.setOutputKeyClass(Text.class);
-
-
job.setNumReduceTasks(1);
-
-
FileInputFormat.setInputPaths(job, new Path(args[0]));
-
FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
FileSystem.get(job.getConfiguration()).delete(new Path(args[1]), true);
-
-
System.out.println(job.waitForCompletion(true));
-
} catch (IOException e) {
-
e.printStackTrace();
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} catch (ClassNotFoundException e) {
-
e.printStackTrace();
-
}
-
-
}
-
}
阅读(1310) | 评论(0) | 转发(0) |