Chinaunix首页 | 论坛 | 博客
  • 博客访问: 823870
  • 博文数量: 247
  • 博客积分: 166
  • 博客等级: 入伍新兵
  • 技术积分: 2199
  • 用 户 组: 普通用户
  • 注册时间: 2012-11-15 16:10
文章分类

全部博文(247)

文章存档

2017年(1)

2015年(63)

2014年(80)

2013年(94)

2012年(9)

分类: HADOOP

2013-07-31 12:06:02

以下为实现时用到的包及其职责

 

包含 检测迭代能否停止的mapreduce任务的一些类

程序的入口

图的基本建模

每一次迭代的mapreduce任务中的一些类

一些辅助类

 

1 

 

package com.ouyang.chk;

 

import java.io.IOException;

 

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

import com.ouyang.graph.Color;

import com.ouyang.graph.Node;

 

public class CHKMapper extends   Mapper

{

 

public void map(LongWritable key, Text value,

Context context

     ) throws IOException, InterruptedException

{

 

Node node = new Node(value.toString());

//if the color is gray , ouput<1,1>

if(node.getColor() == Color.gray)

{

context.write(new IntWritable(1), new IntWritable(1));

}

else

{

context.write(new IntWritable(1), new IntWritable(0));

}

}

 

}


 


package com.ouyang.chk;

 

import java.io.IOException;

 

 

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.mapreduce.Reducer;

 

 

public class CHKReducer extends   Reducer

{

 

public void reduce(IntWritable key, Iterable values,

Context context

   ) throws IOException, InterruptedException

{

int sum = 0;

 

 

for (IntWritable tempint: values)

{

sum +=   tempint.get();

     }

 

context.write(new IntWritable(1), new IntWritable(sum));

}

 

}


转自http://hi.baidu.com/ouyanggaoyan/blog/item/0c2da3091424ae3ce9248849.html

2 

 

SSPMapper 

package com.ouyang.ssp;

 

 

import java.io.IOException;

 

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

 

import com.ouyang.graph.Color;

import com.ouyang.graph.Edge;

import com.ouyang.graph.Node;

 

 

public   class SSPMapper extends   Mapper

{

public void map(LongWritable key, Text value,

Context context

     ) throws IOException, InterruptedException

{

//Get the logic Node object from the record string

Node thisNode = new Node(value.toString());

 

if(thisNode.getColor() == Color.gray)

{

for(Edge e : thisNode.getEdges())

{

Node node = new Node(e.getEndPoint());

node.setColor(Color.gray);

node.setDistance(thisNode.getDistance() + e.getWeight());

context.write(new IntWritable(node.getId()), new Text(node.getText()));

}

 

thisNode.setColor(Color.black);

}

 

context.write(new IntWritable(thisNode.getId()), new Text(thisNode.getText()));

}

}

 

 

SSPReducer

 

package com.ouyang.ssp;

 

 

import java.io.IOException;

 

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import com.ouyang.graph.Color;

import com.ouyang.graph.Node;

 

 

public class SSPReducer   extends   Reducer

{

  

   public void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException

   {

 

     Node node = new Node(key.get());

    

     for(Text value: values)

     {

      Node tmpNode = new Node(node.getId()+"\t"+value.toString());

  

      if(tmpNode.getColor() > node.getColor())

      {

      node.setColor(tmpNode.getColor());

      }

     

      if(tmpNode.getDistance() < node.getDistance())

      {

      node.setDistance(tmpNode.getDistance());

      if(tmpNode.getColor() == Color.gray)

         {

  

      node.setColor(Color.gray);

         }

      }

     

      if(tmpNode.getEdges().size() > 0)

      {

      node.setEdges(tmpNode.getEdges());

      }

     }

    

     context.write(new IntWritable(node.getId()), new Text(node.getText()));

   }

}

完整的代码放在豆丁网:

转自http://hi.baidu.com/ouyanggaoyan/blog/item/391a332ad03084f6e6cd4023.html

阅读(2405) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~