全部博文(247)
分类: HADOOP
2013-07-31 12:06:02
以下为实现时用到的包及其职责
|
包含 检测迭代能否停止的mapreduce任务的一些类 |
|
程序的入口 |
|
图的基本建模 |
|
每一次迭代的mapreduce任务中的一些类 |
|
一些辅助类 |
1
类
package com.ouyang.chk;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import com.ouyang.graph.Color; import com.ouyang.graph.Node;
public class CHKMapper extends Mapper {
public void map(LongWritable key, Text value, Context context ) throws IOException, InterruptedException {
Node node = new Node(value.toString()); //if the color is gray , ouput<1,1> if(node.getColor() == Color.gray) { context.write(new IntWritable(1), new IntWritable(1)); } else { context.write(new IntWritable(1), new IntWritable(0)); } }
}
|
类
package com.ouyang.chk;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer;
public class CHKReducer extends Reducer {
public void reduce(IntWritable key, Iterable Context context ) throws IOException, InterruptedException { int sum = 0;
for (IntWritable tempint: values) { sum += tempint.get(); }
context.write(new IntWritable(1), new IntWritable(sum)); }
} |
2
SSPMapper 类
package com.ouyang.ssp;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import com.ouyang.graph.Color; import com.ouyang.graph.Edge; import com.ouyang.graph.Node;
public class SSPMapper extends Mapper { public void map(LongWritable key, Text value, Context context ) throws IOException, InterruptedException { //Get the logic Node object from the record string Node thisNode = new Node(value.toString());
if(thisNode.getColor() == Color.gray) { for(Edge e : thisNode.getEdges()) { Node node = new Node(e.getEndPoint()); node.setColor(Color.gray); node.setDistance(thisNode.getDistance() + e.getWeight()); context.write(new IntWritable(node.getId()), new Text(node.getText())); }
thisNode.setColor(Color.black); }
context.write(new IntWritable(thisNode.getId()), new Text(thisNode.getText())); } }
|
SSPReducer类
package com.ouyang.ssp;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import com.ouyang.graph.Color; import com.ouyang.graph.Node;
public class SSPReducer extends Reducer {
public void reduce(IntWritable key, Iterable {
Node node = new Node(key.get());
for(Text value: values) { Node tmpNode = new Node(node.getId()+"\t"+value.toString());
if(tmpNode.getColor() > node.getColor()) { node.setColor(tmpNode.getColor()); }
if(tmpNode.getDistance() < node.getDistance()) { node.setDistance(tmpNode.getDistance()); if(tmpNode.getColor() == Color.gray) {
node.setColor(Color.gray); } }
if(tmpNode.getEdges().size() > 0) { node.setEdges(tmpNode.getEdges()); } }
context.write(new IntWritable(node.getId()), new Text(node.getText())); } } |
完整的代码放在豆丁网:
转自http://hi.baidu.com/ouyanggaoyan/blog/item/391a332ad03084f6e6cd4023.html