在map中进行join
步骤:
1、在驱动中加载文件到 hdfs缓存中
2、在map的 setup函数中,读取缓存内容,放入HashMap中
3、在map函数中读取,根据key读取对应的值,在map中进行join
具体实现如下:
-
import org.apache.commons.lang.StringUtils;
-
import org.apache.commons.logging.Log;
-
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.fs.FSDataInputStream;
-
import org.apache.hadoop.fs.FileSystem;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.io.LongWritable;
-
import org.apache.hadoop.io.NullWritable;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.apache.hadoop.util.GenericOptionsParser;
-
-
import java.io.BufferedReader;
-
import java.io.FileNotFoundException;
-
import java.io.IOException;
-
import java.io.InputStreamReader;
-
import java.net.URI;
-
import java.net.URISyntaxException;
-
import java.util.HashMap;
-
-
-
public class MapperJoinStatJob {
-
private static final Log LOG = LogFactory.getLog(MapperJoinStatJob.class);
-
-
public static void main(String[] args){
-
-
try {
-
Job job = Job.getInstance();
-
job.setJobName("MapperJoinStatJob");
-
job.setJarByClass(MapperJoinStatJob.class);
-
-
String []array = new GenericOptionsParser(job.getConfiguration(),args).getRemainingArgs();
-
FileInputFormat.setInputPaths(job,new Path(array[0]));
-
job.addCacheFile(new URI(array[1]));
-
FileOutputFormat.setOutputPath(job,new Path(array[2]));
-
-
job.setMapperClass(MapperJoinStatJob.MapperJoinStatMapper.class);
-
job.setMapOutputKeyClass(Text.class);
-
job.setMapOutputValueClass(Text.class);
-
-
job.setReducerClass(MapperJoinStatJob.ReducerJoinStatReducer.class);
-
job.setNumReduceTasks(1);
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(Text.class);
-
boolean ret = job.waitForCompletion(true);
-
LOG.info("ret:"+ret);
-
} catch (IOException e) {
-
e.printStackTrace();
-
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} catch (ClassNotFoundException e) {
-
e.printStackTrace();
-
} catch (URISyntaxException e) {
-
e.printStackTrace();
-
}
-
-
}
-
public static class MapperJoinStatMapper extends Mapper<LongWritable, Text, Text, Text> {
-
private HashMap<String, String> nameMap = new HashMap<String,String>();
-
FileSystem fileSystem=null;
-
@Override
-
protected void setup(Mapper.Context context) throws IOException, InterruptedException {
-
System.out.println("MapperJoinStatMapper setup start" );
-
try {
-
URI[] localPaths = context.getCacheFiles();
-
for (URI uri:localPaths){
-
LOG.info("localPaths:"+uri.toString());
-
}
-
-
if(localPaths.length==0 ){
-
throw new FileNotFoundException("Distributed cache file not found.");
-
}
-
-
FileSystem fileSystem = FileSystem.get(context.getConfiguration());
-
FSDataInputStream fsData = fileSystem.open(new Path(localPaths[0].getPath()));
-
BufferedReader bf = new BufferedReader(new InputStreamReader(fsData));
-
String strData="";
-
while ((strData=bf.readLine())!=null){
-
String []arrays = strData.split("\t");
-
this.nameMap.put(arrays[0],arrays[1]);
-
}
-
}catch (Exception ex){
-
System.out.println("Exception");
-
ex.printStackTrace();
-
}
-
-
}
-
@Override
-
protected void map(LongWritable key, Text value,
-
Context context) throws IOException, InterruptedException{
-
String strValue = value.toString();
-
if(StringUtils.isEmpty(strValue)){
-
LOG.info("strValue is null");
-
return;
-
}
-
String []arrs = strValue.split("\t");
-
String name = nameMap.get(arrs[0]);
-
context.write(new Text(arrs[0] +"\t"+ name), new Text(arrs[1] ));
-
}
-
}
-
-
public static class ReducerJoinStatReducer extends Reducer<Text, Text, NullWritable,Text>{
-
-
@Override
-
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
-
int count=0;
-
for (Text text:values){
-
count++;
-
}
-
context.write(NullWritable.get(),new Text(key +"\t"+String.valueOf(count)));
-
}
-
}
-
-
-
}
阅读(656) | 评论(0) | 转发(0) |